summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-01-25 16:25:05 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-01-25 16:25:05 +0000
commitfadd169527df40c715e549f538d256fc23bab3da (patch)
tree4b95900eb03adeb45984c5451698eb499c6482d1
parentf690f1d9aecfd8021dda72714923b55cfe8d607a (diff)
downloadqpid-python-fadd169527df40c715e549f538d256fc23bab3da.tar.gz
Move the QMFv2 implementation to its own directory.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@902858 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qmf2/agent.py758
-rw-r--r--qpid/python/qmf2/common.py1881
-rw-r--r--qpid/python/qmf2/console.py1959
-rw-r--r--qpid/python/qmf2/tests/__init__.py22
-rw-r--r--qpid/python/qmf2/tests/agent_discovery.py320
-rw-r--r--qpid/python/qmf2/tests/agent_test.py167
-rw-r--r--qpid/python/qmf2/tests/basic_method.py348
-rw-r--r--qpid/python/qmf2/tests/basic_query.py336
-rw-r--r--qpid/python/qmf2/tests/console_test.py175
-rw-r--r--qpid/python/qmf2/tests/events.py193
-rw-r--r--qpid/python/qmf2/tests/obj_gets.py399
11 files changed, 6558 insertions, 0 deletions
diff --git a/qpid/python/qmf2/agent.py b/qpid/python/qmf2/agent.py
new file mode 100644
index 0000000000..c6a518ca31
--- /dev/null
+++ b/qpid/python/qmf2/agent.py
@@ -0,0 +1,758 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import logging
+import datetime
+import time
+import Queue
+from threading import Thread, Lock, currentThread
+from qpid.messaging import Connection, Message, Empty, SendError
+from uuid import uuid4
+from common import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
+ makeSubject, parseSubject, OpCode, QmfQuery,
+ SchemaObjectClass, MsgKey, QmfData, QmfAddress,
+ SchemaClass, SchemaClassId, WorkItem, SchemaMethod)
+
+# global flag that indicates which thread (if any) is
+# running the agent notifier callback
+_callback_thread=None
+
+ ##==============================================================================
+ ## METHOD CALL
+ ##==============================================================================
+
+class _MethodCallHandle(object):
+ """
+ Private class used to hold context when handing off a method call to the
+ application. Given to the app in a WorkItem, provided to the agent when
+ method_response() is invoked.
+ """
+ def __init__(self, correlation_id, reply_to, meth_name, _oid=None):
+ self.correlation_id = correlation_id
+ self.reply_to = reply_to
+ self.meth_name = meth_name
+ self.oid = _oid
+
+class MethodCallParams(object):
+ """
+ """
+ def __init__(self, name, _oid=None, _in_args=None, _user_id=None):
+ self._meth_name = name
+ self._oid = _oid
+ self._in_args = _in_args
+ self._user_id = _user_id
+
+ def get_name(self):
+ return self._meth_name
+
+ def get_object_id(self):
+ return self._oid
+
+ def get_args(self):
+ return self._in_args
+
+ def get_user_id(self):
+ return self._user_id
+
+
+
+ ##==============================================================================
+ ## AGENT
+ ##==============================================================================
+
+class Agent(Thread):
+ def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30,
+ _max_msg_size=0, _capacity=10):
+ Thread.__init__(self)
+ self._running = False
+
+ self.name = str(name)
+ self._domain = _domain
+ self._notifier = _notifier
+ self._heartbeat_interval = _heartbeat_interval
+ self._max_msg_size = _max_msg_size
+ self._capacity = _capacity
+
+ self._conn = None
+ self._session = None
+ self._direct_receiver = None
+ self._locate_receiver = None
+ self._ind_sender = None
+ self._event_sender = None
+
+ self._lock = Lock()
+ self._packages = {}
+ self._schema_timestamp = long(0)
+ self._schema = {}
+ self._agent_data = {}
+ self._work_q = Queue.Queue()
+ self._work_q_put = False
+
+
+ def destroy(self, timeout=None):
+ """
+ Must be called before the Agent is deleted.
+ Frees up all resources and shuts down all background threads.
+
+ @type timeout: float
+ @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever.
+ """
+ logging.debug("Destroying Agent %s" % self.name)
+ if self._conn:
+ self.remove_connection(timeout)
+ logging.debug("Agent Destroyed")
+
+
+ def get_name(self):
+ return self.name
+
+ def set_connection(self, conn):
+ self._conn = conn
+ self._session = self._conn.session()
+
+ my_addr = QmfAddress.direct(self.name, self._domain)
+ self._direct_receiver = self._session.receiver(str(my_addr) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}",
+ capacity=self._capacity)
+ logging.debug("my direct addr=%s" % self._direct_receiver.source)
+
+ locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
+ self._locate_receiver = self._session.receiver(str(locate_addr) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic}}",
+ capacity=self._capacity)
+ logging.debug("agent.locate addr=%s" % self._locate_receiver.source)
+
+
+ ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
+ self._ind_sender = self._session.sender(str(ind_addr) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic}}")
+ logging.debug("agent.ind addr=%s" % self._ind_sender.target)
+
+ my_events = QmfAddress.topic(self.name, self._domain)
+ self._event_sender = self._session.sender(str(my_events) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic}}")
+ logging.debug("my event addr=%s" % self._event_sender.target)
+
+ self._running = True
+ self.start()
+
+ def remove_connection(self, timeout=None):
+ # tell connection thread to shutdown
+ self._running = False
+ if self.isAlive():
+ # kick my thread to wake it up
+ my_addr = QmfAddress.direct(self.name, self._domain)
+ logging.debug("Making temp sender for [%s]" % str(my_addr))
+ tmp_sender = self._session.sender(str(my_addr))
+ try:
+ msg = Message(subject=makeSubject(OpCode.noop))
+ tmp_sender.send( msg, sync=True )
+ except SendError, e:
+ logging.error(str(e))
+ logging.debug("waiting for agent receiver thread to exit")
+ self.join(timeout)
+ if self.isAlive():
+ logging.error( "Agent thread '%s' is hung..." % self.name)
+ self._direct_receiver.close()
+ self._direct_receiver = None
+ self._locate_receiver.close()
+ self._locate_receiver = None
+ self._ind_sender.close()
+ self._ind_sender = None
+ self._event_sender.close()
+ self._event_sender = None
+ self._session.close()
+ self._session = None
+ self._conn = None
+ logging.debug("agent connection removal complete")
+
+ def register_object_class(self, schema):
+ """
+ Register an instance of a SchemaClass with this agent
+ """
+ # @todo: need to update subscriptions
+ # @todo: need to mark schema as "non-const"
+ if not isinstance(schema, SchemaClass):
+ raise TypeError("SchemaClass instance expected")
+
+ self._lock.acquire()
+ try:
+ classId = schema.get_class_id()
+ pname = classId.get_package_name()
+ cname = classId.get_class_name()
+ if pname not in self._packages:
+ self._packages[pname] = [cname]
+ else:
+ if cname not in self._packages[pname]:
+ self._packages[pname].append(cname)
+ self._schema[classId] = schema
+ self._schema_timestamp = long(time.time() * 1000)
+ finally:
+ self._lock.release()
+
+ def register_event_class(self, schema):
+ return self.register_object_class(schema)
+
+ def raise_event(self, qmfEvent):
+ """
+ TBD
+ """
+ if not self._event_sender:
+ raise Exception("No connection available")
+
+ # @todo: should we validate against the schema?
+ _map = {"_name": self.get_name(),
+ "_event": qmfEvent.map_encode()}
+ msg = Message(subject=makeSubject(OpCode.event_ind),
+ properties={"method":"response"},
+ content={MsgKey.event:_map})
+ self._event_sender.send(msg)
+
+ def add_object(self, data ):
+ """
+ Register an instance of a QmfAgentData object.
+ """
+ # @todo: need to update subscriptions
+ # @todo: need to mark schema as "non-const"
+ if not isinstance(data, QmfAgentData):
+ raise TypeError("QmfAgentData instance expected")
+
+ id_ = data.get_object_id()
+ if not id_:
+ raise TypeError("No identifier assigned to QmfAgentData!")
+
+ self._lock.acquire()
+ try:
+ self._agent_data[id_] = data
+ finally:
+ self._lock.release()
+
+ def get_object(self, id):
+ self._lock.acquire()
+ try:
+ data = self._agent_data.get(id)
+ finally:
+ self._lock.release()
+ return data
+
+
+ def method_response(self, handle, _out_args=None, _error=None):
+ """
+ """
+ if not isinstance(handle, _MethodCallHandle):
+ raise TypeError("Invalid handle passed to method_response!")
+
+ _map = {SchemaMethod.KEY_NAME:handle.meth_name}
+ if handle.oid is not None:
+ _map[QmfData.KEY_OBJECT_ID] = handle.oid
+ if _out_args is not None:
+ _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy()
+ if _error is not None:
+ if not isinstance(_error, QmfData):
+ raise TypeError("Invalid type for error - must be QmfData")
+ _map[SchemaMethod.KEY_ERROR] = _error.map_encode()
+
+ msg = Message(subject=makeSubject(OpCode.response),
+ properties={"method":"response"},
+ content={MsgKey.method:_map})
+ msg.correlation_id = handle.correlation_id
+
+ try:
+ tmp_snd = self._session.sender( handle.reply_to )
+ tmp_snd.send(msg)
+ logging.debug("method-response sent to [%s]" % handle.reply_to)
+ except SendError, e:
+ logging.error("Failed to send method response msg '%s' (%s)" % (msg, str(e)))
+
+ def get_workitem_count(self):
+ """
+ Returns the count of pending WorkItems that can be retrieved.
+ """
+ return self._work_q.qsize()
+
+ def get_next_workitem(self, timeout=None):
+ """
+ Obtains the next pending work item, or None if none available.
+ """
+ try:
+ wi = self._work_q.get(True, timeout)
+ except Queue.Empty:
+ return None
+ return wi
+
+ def release_workitem(self, wi):
+ """
+ Releases a WorkItem instance obtained by getNextWorkItem(). Called when
+ the application has finished processing the WorkItem.
+ """
+ pass
+
+
+ def run(self):
+ global _callback_thread
+ next_heartbeat = datetime.datetime.utcnow()
+ batch_limit = 10 # a guess
+ while self._running:
+
+ now = datetime.datetime.utcnow()
+ # print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
+ if now >= next_heartbeat:
+ self._ind_sender.send(self._makeAgentIndMsg())
+ logging.debug("Agent Indication Sent")
+ next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
+
+ timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds
+ # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
+ try:
+ self._session.next_receiver(timeout=timeout)
+ except Empty:
+ continue
+
+ for i in range(batch_limit):
+ try:
+ msg = self._locate_receiver.fetch(timeout=0)
+ except Empty:
+ break
+ if msg and msg.content_type == "amqp/map":
+ self._dispatch(msg, _direct=False)
+
+ for i in range(batch_limit):
+ try:
+ msg = self._direct_receiver.fetch(timeout=0)
+ except Empty:
+ break
+ if msg and msg.content_type == "amqp/map":
+ self._dispatch(msg, _direct=True)
+
+ if self._work_q_put and self._notifier:
+ # new stuff on work queue, kick the the application...
+ self._work_q_put = False
+ _callback_thread = currentThread()
+ logging.info("Calling agent notifier.indication")
+ self._notifier.indication()
+ _callback_thread = None
+
+ #
+ # Private:
+ #
+
+ def _makeAgentIndMsg(self):
+ """
+ Create an agent indication message identifying this agent
+ """
+ _map = {"_name": self.get_name(),
+ "_schema_timestamp": self._schema_timestamp}
+ return Message( subject=makeSubject(OpCode.agent_ind),
+ properties={"method":"response"},
+ content={MsgKey.agent_info: _map})
+
+
+ def _dispatch(self, msg, _direct=False):
+ """
+ Process a message from a console.
+
+ @param _direct: True if msg directly addressed to this agent.
+ """
+ logging.debug( "Message received from Console! [%s]" % msg )
+ try:
+ version,opcode = parseSubject(msg.subject)
+ except:
+ logging.debug("Ignoring unrecognized message '%s'" % msg.subject)
+ return
+
+ cmap = {}; props={}
+ if msg.content_type == "amqp/map":
+ cmap = msg.content
+ if msg.properties:
+ props = msg.properties
+
+ if opcode == OpCode.agent_locate:
+ self._handleAgentLocateMsg( msg, cmap, props, version, _direct )
+ elif opcode == OpCode.get_query:
+ self._handleQueryMsg( msg, cmap, props, version, _direct )
+ elif opcode == OpCode.method_req:
+ self._handleMethodReqMsg(msg, cmap, props, version, _direct)
+ elif opcode == OpCode.cancel_subscription:
+ logging.warning("!!! CANCEL_SUB TBD !!!")
+ elif opcode == OpCode.create_subscription:
+ logging.warning("!!! CREATE_SUB TBD !!!")
+ elif opcode == OpCode.renew_subscription:
+ logging.warning("!!! RENEW_SUB TBD !!!")
+ elif opcode == OpCode.schema_query:
+ logging.warning("!!! SCHEMA_QUERY TBD !!!")
+ elif opcode == OpCode.noop:
+ logging.debug("No-op msg received.")
+ else:
+ logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'"
+ % opcode)
+
+ def _handleAgentLocateMsg( self, msg, cmap, props, version, direct ):
+ """
+ Process a received agent-locate message
+ """
+ logging.debug("_handleAgentLocateMsg")
+
+ reply = True
+ if "method" in props and props["method"] == "request":
+ query = cmap.get(MsgKey.query)
+ if query is not None:
+ # fake a QmfData containing my identifier for the query compare
+ tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()})
+ reply = QmfQuery(query).evaluate(tmpData)
+
+ if reply:
+ try:
+ tmp_snd = self._session.sender( msg.reply_to )
+ m = self._makeAgentIndMsg()
+ m.correlation_id = msg.correlation_id
+ tmp_snd.send(m)
+ logging.debug("agent-ind sent to [%s]" % msg.reply_to)
+ except SendError, e:
+ logging.error("Failed to send reply to agent-ind msg '%s' (%s)" % (msg, str(e)))
+ else:
+ logging.debug("agent-locate msg not mine - no reply sent")
+
+
+ def _handleQueryMsg(self, msg, cmap, props, version, _direct ):
+ """
+ Handle received query message
+ """
+ logging.debug("_handleQueryMsg")
+
+ if "method" in props and props["method"] == "request":
+ qmap = cmap.get(MsgKey.query)
+ if qmap:
+ query = QmfQuery.from_map(qmap)
+ target = query.get_target()
+ if target == QmfQuery.TARGET_PACKAGES:
+ self._queryPackages( msg, query )
+ elif target == QmfQuery.TARGET_SCHEMA_ID:
+ self._querySchema( msg, query, _idOnly=True )
+ elif target == QmfQuery.TARGET_SCHEMA:
+ self._querySchema( msg, query)
+ elif target == QmfQuery.TARGET_AGENT:
+ logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
+ elif target == QmfQuery.TARGET_OBJECT_ID:
+ self._queryData(msg, query, _idOnly=True)
+ elif target == QmfQuery.TARGET_OBJECT:
+ self._queryData(msg, query)
+ else:
+ logging.warning("Unrecognized query target: '%s'" % str(target))
+
+
+
+ def _handleMethodReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Method Request
+ """
+ if "method" in props and props["method"] == "request":
+ mname = cmap.get(SchemaMethod.KEY_NAME)
+ if not mname:
+ logging.warning("Invalid method call from '%s': no name"
+ % msg.reply_to)
+ return
+
+ in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS)
+ oid = cmap.get(QmfData.KEY_OBJECT_ID)
+
+ print("!!! ci=%s rt=%s mn=%s oid=%s" %
+ (msg.correlation_id,
+ msg.reply_to,
+ mname,
+ oid))
+
+ handle = _MethodCallHandle(msg.correlation_id,
+ msg.reply_to,
+ mname,
+ oid)
+ param = MethodCallParams( mname, oid, in_args, msg.user_id)
+ self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
+ self._work_q_put = True
+
+ def _queryPackages(self, msg, query):
+ """
+ Run a query against the list of known packages
+ """
+ pnames = []
+ self._lock.acquire()
+ try:
+ for name in self._packages.iterkeys():
+ if query.evaluate(QmfData.create({SchemaClassId.KEY_PACKAGE:name})):
+ pnames.append(name)
+ finally:
+ self._lock.release()
+
+ try:
+ tmp_snd = self._session.sender( msg.reply_to )
+ m = Message( subject=makeSubject(OpCode.data_ind),
+ properties={"method":"response"},
+ content={MsgKey.package_info: pnames} )
+ if msg.correlation_id != None:
+ m.correlation_id = msg.correlation_id
+ tmp_snd.send(m)
+ logging.debug("package_info sent to [%s]" % msg.reply_to)
+ except SendError, e:
+ logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+
+
+ def _querySchema( self, msg, query, _idOnly=False ):
+ """
+ """
+ schemas = []
+ # if querying for a specific schema, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
+ found = None
+ self._lock.acquire()
+ try:
+ found = self._schema.get(query.get_id())
+ finally:
+ self._lock.release()
+ if found:
+ if _idOnly:
+ schemas.append(query.get_id().map_encode())
+ else:
+ schemas.append(found.map_encode())
+ else: # otherwise, evaluate all schema
+ self._lock.acquire()
+ try:
+ for sid,val in self._schema.iteritems():
+ if query.evaluate(val):
+ if _idOnly:
+ schemas.append(sid.map_encode())
+ else:
+ schemas.append(val.map_encode())
+ finally:
+ self._lock.release()
+
+
+ tmp_snd = self._session.sender( msg.reply_to )
+
+ if _idOnly:
+ content = {MsgKey.schema_id: schemas}
+ else:
+ content = {MsgKey.schema:schemas}
+
+ m = Message( subject=makeSubject(OpCode.data_ind),
+ properties={"method":"response"},
+ content=content )
+ if msg.correlation_id != None:
+ m.correlation_id = msg.correlation_id
+ try:
+ tmp_snd.send(m)
+ logging.debug("schema_id sent to [%s]" % msg.reply_to)
+ except SendError, e:
+ logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+
+
+ def _queryData( self, msg, query, _idOnly=False ):
+ """
+ """
+ data_objs = []
+ # if querying for a specific object, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
+ found = None
+ self._lock.acquire()
+ try:
+ found = self._agent_data.get(query.get_id())
+ finally:
+ self._lock.release()
+ if found:
+ if _idOnly:
+ data_objs.append(query.get_id())
+ else:
+ data_objs.append(found.map_encode())
+ else: # otherwise, evaluate all data
+ self._lock.acquire()
+ try:
+ for oid,val in self._agent_data.iteritems():
+ if query.evaluate(val):
+ if _idOnly:
+ data_objs.append(oid)
+ else:
+ data_objs.append(val.map_encode())
+ finally:
+ self._lock.release()
+
+ tmp_snd = self._session.sender( msg.reply_to )
+
+ if _idOnly:
+ content = {MsgKey.object_id:data_objs}
+ else:
+ content = {MsgKey.data_obj:data_objs}
+
+ m = Message( subject=makeSubject(OpCode.data_ind),
+ properties={"method":"response"},
+ content=content )
+ if msg.correlation_id != None:
+ m.correlation_id = msg.correlation_id
+ try:
+ tmp_snd.send(m)
+ logging.debug("data reply sent to [%s]" % msg.reply_to)
+ except SendError, e:
+ logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+
+
+ ##==============================================================================
+ ## DATA MODEL
+ ##==============================================================================
+
+
+class QmfAgentData(QmfData):
+ """
+ A managed data object that is owned by an agent.
+ """
+
+ def __init__(self, agent, _values={}, _subtypes={}, _tag=None, _object_id=None,
+ _schema=None):
+ # timestamp in millisec since epoch UTC
+ ctime = long(time.time() * 1000)
+ super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes,
+ _tag=_tag, _ctime=ctime,
+ _utime=ctime, _object_id=_object_id,
+ _schema=_schema, _const=False)
+ self._agent = agent
+
+ def destroy(self):
+ self._dtime = long(time.time() * 1000)
+ # @todo: publish change
+
+ def is_deleted(self):
+ return self._dtime == 0
+
+ def set_value(self, _name, _value, _subType=None):
+ super(QmfAgentData, self).set_value(_name, _value, _subType)
+ # @todo: publish change
+
+ def inc_value(self, name, delta=1):
+ """ add the delta to the property """
+ # @todo: need to take write-lock
+ val = self.get_value(name)
+ try:
+ val += delta
+ except:
+ raise
+ self.set_value(name, val)
+
+ def dec_value(self, name, delta=1):
+ """ subtract the delta from the property """
+ # @todo: need to take write-lock
+ logging.error(" TBD!!!")
+
+
+################################################################################
+################################################################################
+################################################################################
+################################################################################
+
+if __name__ == '__main__':
+ # static test cases - no message passing, just exercise API
+ from common import (AgentName, SchemaProperty, qmfTypes,
+ SchemaMethod, SchemaEventClass)
+
+ logging.getLogger().setLevel(logging.INFO)
+
+ logging.info( "Create an Agent" )
+ _agent_name = AgentName("redhat.com", "agent", "tross")
+ _agent = Agent(str(_agent_name))
+
+ logging.info( "Get agent name: '%s'" % _agent.get_name())
+
+ logging.info( "Create SchemaObjectClass" )
+
+ _schema = SchemaObjectClass(SchemaClassId("MyPackage", "MyClass"),
+ _desc="A test data schema",
+ _object_id_names=["index1", "index2"])
+ # add properties
+ _schema.add_property("index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.add_property("index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ # these two properties are statistics
+ _schema.add_property("query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property("method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ # These two properties can be set via the method call
+ _schema.add_property("set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property("set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # add method
+ _meth = SchemaMethod(_desc="Method to set string and int in object." )
+ _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+ _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+ _schema.add_method( "set_meth", _meth )
+
+ # Add schema to Agent
+
+ print("Schema Map='%s'" % str(_schema.map_encode()))
+
+ _agent.register_object_class(_schema)
+
+ # instantiate managed data objects matching the schema
+
+ logging.info( "Create QmfAgentData" )
+
+ _obj = QmfAgentData( _agent, _schema=_schema )
+ _obj.set_value("index1", 100)
+ _obj.set_value("index2", "a name" )
+ _obj.set_value("set_string", "UNSET")
+ _obj.set_value("set_int", 0)
+ _obj.set_value("query_count", 0)
+ _obj.set_value("method_call_count", 0)
+
+ print("Obj1 Map='%s'" % str(_obj.map_encode()))
+
+ _agent.add_object( _obj )
+
+ _obj = QmfAgentData( _agent,
+ _values={"index1":99,
+ "index2": "another name",
+ "set_string": "UNSET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0},
+ _schema=_schema)
+
+ print("Obj2 Map='%s'" % str(_obj.map_encode()))
+
+ _agent.add_object(_obj)
+
+ ##############
+
+
+
+ logging.info( "Create SchemaEventClass" )
+
+ _event = SchemaEventClass(SchemaClassId("MyPackage", "MyEvent",
+ stype=SchemaClassId.TYPE_EVENT),
+ _desc="A test data schema",
+ _props={"edata_1": SchemaProperty(qmfTypes.TYPE_UINT32)})
+ _event.add_property("edata_2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ print("Event Map='%s'" % str(_event.map_encode()))
+
+ _agent.register_event_class(_event)
diff --git a/qpid/python/qmf2/common.py b/qpid/python/qmf2/common.py
new file mode 100644
index 0000000000..061c9fbe78
--- /dev/null
+++ b/qpid/python/qmf2/common.py
@@ -0,0 +1,1881 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import time
+import logging
+from threading import Lock
+from threading import Condition
+try:
+ import hashlib
+ _md5Obj = hashlib.md5
+except ImportError:
+ import md5
+ _md5Obj = md5.new
+
+
+
+
+##
+## Constants
+##
+
+
+AMQP_QMF_AGENT_LOCATE = "agent.locate"
+AMQP_QMF_AGENT_INDICATION = "agent.ind"
+AMQP_QMF_AGENT_EVENT="agent.event"
+# agent.ind[.<agent-name>]
+# agent.event.<sev>.<agent-name>
+# sev="strings"
+#
+
+AMQP_QMF_SUBJECT = "qmf"
+AMQP_QMF_VERSION = 4
+AMQP_QMF_SUBJECT_FMT = "%s%d.%s"
+
+class MsgKey(object):
+ agent_info = "agent_info"
+ query = "query"
+ package_info = "package_info"
+ schema_id = "schema_id"
+ schema = "schema"
+ object_id="object_id"
+ data_obj="object"
+ method="method"
+ event="event"
+
+
+class OpCode(object):
+ noop = "noop"
+
+ # codes sent by a console and processed by the agent
+ agent_locate = "agent-locate"
+ cancel_subscription = "cancel-subscription"
+ create_subscription = "create-subscription"
+ get_query = "get-query"
+ method_req = "method"
+ renew_subscription = "renew-subscription"
+ schema_query = "schema-query" # @todo: deprecate
+
+ # codes sent by the agent to a console
+ agent_ind = "agent"
+ data_ind = "data"
+ event_ind = "event"
+ managed_object = "managed-object"
+ object_ind = "object"
+ response = "response"
+ schema_ind="schema" # @todo: deprecate
+
+
+
+
+def makeSubject(_code):
+ """
+ Create a message subject field value.
+ """
+ return AMQP_QMF_SUBJECT_FMT % (AMQP_QMF_SUBJECT, AMQP_QMF_VERSION, _code)
+
+
+def parseSubject(_sub):
+ """
+ Deconstruct a subject field, return version,opcode values
+ """
+ if _sub[:3] != "qmf":
+ raise Exception("Non-QMF message received")
+
+ return _sub[3:].split('.', 1)
+
+
+##==============================================================================
+## Async Event Model
+##==============================================================================
+
+
+class Notifier(object):
+ """
+ Virtual base class that defines a call back which alerts the application that
+ a QMF Console notification is pending.
+ """
+ def indication(self):
+ """
+ Called when one or more items are ready for the application to process.
+ This method may be called by an internal QMF library thread. Its purpose is to
+ indicate that the application should process pending work items.
+ """
+ raise Exception("The indication method must be overridden by the application!")
+
+
+
+class WorkItem(object):
+ """
+ Describes an event that has arrived for the application to process. The
+ Notifier is invoked when one or more of these WorkItems become available
+ for processing.
+ """
+ # Enumeration of the types of WorkItems produced on the Console
+ AGENT_ADDED=1
+ AGENT_DELETED=2
+ NEW_PACKAGE=3
+ NEW_CLASS=4
+ OBJECT_UPDATE=5
+ EVENT_RECEIVED=7
+ AGENT_HEARTBEAT=8
+ # Enumeration of the types of WorkItems produced on the Agent
+ METHOD_CALL=1000
+ QUERY=1001
+ SUBSCRIBE=1002
+ UNSUBSCRIBE=1003
+
+ def __init__(self, kind, handle, _params=None):
+ """
+ Used by the Console to create a work item.
+
+ @type kind: int
+ @param kind: work item type
+ """
+ self._kind = kind
+ self._handle = handle
+ self._params = _params
+
+ def get_type(self):
+ return self._kind
+
+ def get_handle(self):
+ return self._handle
+
+ def get_params(self):
+ return self._params
+
+
+
+##==============================================================================
+## Addressing
+##==============================================================================
+
+class QmfAddress(object):
+ """
+ TBD
+ """
+ TYPE_DIRECT = "direct"
+ TYPE_TOPIC = "topic"
+
+ ADDRESS_FMT = "qmf.%s.%s/%s"
+ DEFAULT_DOMAIN = "default"
+
+
+ def __init__(self, name, domain, type_):
+ self._name = name
+ self._domain = domain
+ self._type = type_
+
+ def _direct(cls, name, _domain=None):
+ if _domain is None:
+ _domain = QmfAddress.DEFAULT_DOMAIN
+ return cls(name, _domain, type_=QmfAddress.TYPE_DIRECT)
+ direct = classmethod(_direct)
+
+ def _topic(cls, name, _domain=None):
+ if _domain is None:
+ _domain = QmfAddress.DEFAULT_DOMAIN
+ return cls(name, _domain, type_=QmfAddress.TYPE_TOPIC)
+ topic = classmethod(_topic)
+
+
+ def get_address(self):
+ return str(self)
+
+ def __repr__(self):
+ return QmfAddress.ADDRESS_FMT % (self._domain, self._type, self._name)
+
+
+
+
+class AgentName(object):
+ """
+ Uniquely identifies a management agent within the management domain.
+ """
+ _separator = ":"
+
+ def __init__(self, vendor, product, name, _str=None):
+ """
+ Note: this object must be immutable, as it is used to index into a dictionary
+ """
+ if _str is not None:
+ # construct from string representation
+ if _str.count(AgentName._separator) < 2:
+ raise TypeError("AgentName string format must be 'vendor.product.name'")
+ self._vendor, self._product, self._name = _str.split(AgentName._separator)
+ else:
+ self._vendor = vendor
+ self._product = product
+ self._name = name
+
+
+ def _from_str(cls, str_):
+ return cls(None, None, None, str_=str_)
+ from_str = classmethod(_from_str)
+
+ def vendor(self):
+ return self._vendor
+
+ def product(self):
+ return self._product
+
+ def name(self):
+ return self._name
+
+ def __cmp__(self, other):
+ if not isinstance(other, AgentName) :
+ raise TypeError("Invalid types for compare")
+ # return 1
+ me = str(self)
+ them = str(other)
+
+ if me < them:
+ return -1
+ if me > them:
+ return 1
+ return 0
+
+ def __hash__(self):
+ return (self._vendor, self._product, self._name).__hash__()
+
+ def __repr__(self):
+ return self._vendor + AgentName._separator + \
+ self._product + AgentName._separator + \
+ self._name
+
+
+
+##==============================================================================
+## DATA MODEL
+##==============================================================================
+
+
+class _mapEncoder(object):
+ """
+ virtual base class for all objects that support being converted to a map
+ """
+
+ def map_encode(self):
+ raise Exception("The map_encode method my be overridden.")
+
+
+class QmfData(_mapEncoder):
+ """
+ Base data class representing arbitrarily structure data. No schema or
+ managing agent is associated with data of this class.
+
+ Map format:
+ map["_values"] = map of unordered "name"=<value> pairs (optional)
+ map["_subtype"] = map of unordered "name"="subtype string" pairs (optional)
+ map["_tag"] = application-specific tag for this instance (optional)
+ """
+ KEY_VALUES = "_values"
+ KEY_SUBTYPES = "_subtypes"
+ KEY_TAG="_tag"
+ KEY_OBJECT_ID = "_object_id"
+ KEY_SCHEMA_ID = "_schema_id"
+ KEY_UPDATE_TS = "_update_ts"
+ KEY_CREATE_TS = "_create_ts"
+ KEY_DELETE_TS = "_delete_ts"
+
+ def __init__(self,
+ _values={}, _subtypes={}, _tag=None, _object_id=None,
+ _ctime = 0, _utime = 0, _dtime = 0,
+ _map=None,
+ _schema=None, _const=False):
+ """
+ @type _values: dict
+ @param _values: dictionary of initial name=value pairs for object's
+ named data.
+ @type _subtypes: dict
+ @param _subtype: dictionary of subtype strings for each of the object's
+ named data.
+ @type _desc: string
+ @param _desc: Human-readable description of this data object.
+ @type _const: boolean
+ @param _const: if true, this object cannot be modified
+ """
+ self._schema_id = None
+ if _map is not None:
+ # construct from map
+ _tag = _map.get(self.KEY_TAG, _tag)
+ _values = _map.get(self.KEY_VALUES, _values)
+ _subtypes = _map.get(self.KEY_SUBTYPES, _subtypes)
+ _object_id = _map.get(self.KEY_OBJECT_ID, _object_id)
+ sid = _map.get(self.KEY_SCHEMA_ID)
+ if sid:
+ self._schema_id = SchemaClassId(_map=sid)
+ _ctime = long(_map.get(self.KEY_CREATE_TS, _ctime))
+ _utime = long(_map.get(self.KEY_UPDATE_TS, _utime))
+ _dtime = long(_map.get(self.KEY_DELETE_TS, _dtime))
+
+ self._values = _values.copy()
+ self._subtypes = _subtypes.copy()
+ self._tag = _tag
+ self._ctime = _ctime
+ self._utime = _utime
+ self._dtime = _dtime
+ self._const = _const
+
+ if _object_id is not None:
+ self._object_id = str(_object_id)
+ else:
+ self._object_id = None
+
+ if _schema is not None:
+ self._set_schema(_schema)
+ else:
+ # careful: map constructor may have already set self._schema_id, do
+ # not override it!
+ self._schema = None
+
+ def _create(cls, values, _subtypes={}, _tag=None, _object_id=None,
+ _schema=None, _const=False):
+ # timestamp in millisec since epoch UTC
+ ctime = long(time.time() * 1000)
+ return cls(_values=values, _subtypes=_subtypes, _tag=_tag,
+ _ctime=ctime, _utime=ctime,
+ _object_id=_object_id, _schema=_schema, _const=_const)
+ create = classmethod(_create)
+
+ def __from_map(cls, map_, _schema=None, _const=False):
+ return cls(_map=map_, _schema=_schema, _const=_const)
+ from_map = classmethod(__from_map)
+
+ def is_managed(self):
+ return self._object_id is not None
+
+ def is_described(self):
+ return self._schema_id is not None
+
+ def get_tag(self):
+ return self._tag
+
+ def get_value(self, name):
+ # meta-properties:
+ if name == SchemaClassId.KEY_PACKAGE:
+ if self._schema_id:
+ return self._schema_id.get_package_name()
+ return None
+ if name == SchemaClassId.KEY_CLASS:
+ if self._schema_id:
+ return self._schema_id.get_class_name()
+ return None
+ if name == SchemaClassId.KEY_TYPE:
+ if self._schema_id:
+ return self._schema_id.get_type()
+ return None
+ if name == SchemaClassId.KEY_HASH:
+ if self._schema_id:
+ return self._schema_id.get_hash_string()
+ return None
+ if name == self.KEY_SCHEMA_ID:
+ return self._schema_id
+ if name == self.KEY_OBJECT_ID:
+ return self._object_id
+ if name == self.KEY_TAG:
+ return self._tag
+ if name == self.KEY_UPDATE_TS:
+ return self._utime
+ if name == self.KEY_CREATE_TS:
+ return self._ctime
+ if name == self.KEY_DELETE_TS:
+ return self._dtime
+
+ return self._values.get(name)
+
+ def has_value(self, name):
+
+ if name in [SchemaClassId.KEY_PACKAGE, SchemaClassId.KEY_CLASS,
+ SchemaClassId.KEY_TYPE, SchemaClassId.KEY_HASH,
+ self.KEY_SCHEMA_ID]:
+ return self._schema_id is not None
+ if name in [self.KEY_UPDATE_TS, self.KEY_CREATE_TS,
+ self.KEY_DELETE_TS]:
+ return True
+ if name == self.KEY_OBJECT_ID:
+ return self._object_id is not None
+ if name == self.KEY_TAG:
+ return self._tag is not None
+
+ return name in self._values
+
+ def set_value(self, _name, _value, _subType=None):
+ if self._const:
+ raise Exception("cannot modify constant data object")
+ self._values[_name] = _value
+ if _subType:
+ self._subtypes[_name] = _subType
+ return _value
+
+ def get_subtype(self, _name):
+ return self._subtypes.get(_name)
+
+ def get_schema_class_id(self):
+ """
+ @rtype: class SchemaClassId
+ @returns: the identifier of the Schema that describes the structure of the data.
+ """
+ return self._schema_id
+
+ def get_object_id(self):
+ """
+ Get the instance's identification string.
+ @rtype: str
+ @returns: the identification string, or None if not assigned and id.
+ """
+ if self._object_id:
+ return self._object_id
+
+ # if object id not assigned, see if schema defines a set of field
+ # values to use as an id
+ if not self._schema:
+ return None
+
+ ids = self._schema.get_id_names()
+ if not ids:
+ return None
+
+ if not self._validated:
+ self._validate()
+
+ result = u""
+ for key in ids:
+ try:
+ result += unicode(self._values[key])
+ except:
+ logging.error("get_object_id(): cannot convert value '%s'."
+ % key)
+ return None
+ self._object_id = result
+ return result
+
+ def map_encode(self):
+ _map = {}
+ if self._tag:
+ _map[self.KEY_TAG] = self._tag
+
+ # data in the _values map may require recursive map_encode()
+ vmap = {}
+ for name,val in self._values.iteritems():
+ if isinstance(val, _mapEncoder):
+ vmap[name] = val.map_encode()
+ else:
+ # otherwise, just toss in the native type...
+ vmap[name] = val
+
+ _map[self.KEY_VALUES] = vmap
+ # subtypes are never complex, so safe to just copy
+ _map[self.KEY_SUBTYPES] = self._subtypes.copy()
+ if self._object_id:
+ _map[self.KEY_OBJECT_ID] = self._object_id
+ if self._schema_id:
+ _map[self.KEY_SCHEMA_ID] = self._schema_id.map_encode()
+ return _map
+
+ def _set_schema(self, schema):
+ self._validated = False
+ self._schema = schema
+ if schema:
+ self._schema_id = schema.get_class_id()
+ if self._const:
+ self._validate()
+ else:
+ self._schema_id = None
+
+ def _validate(self):
+ """
+ Compares this object's data against the associated schema. Throws an
+ exception if the data does not conform to the schema.
+ """
+ props = self._schema.get_properties()
+ for name,val in props.iteritems():
+ # @todo validate: type compatible with amqp_type?
+ # @todo validate: primary keys have values
+ if name not in self._values:
+ if val._isOptional:
+ # ok not to be present, put in dummy value
+ # to simplify access
+ self._values[name] = None
+ else:
+ raise Exception("Required property '%s' not present." % name)
+ self._validated = True
+
+ def __repr__(self):
+ return "QmfData=<<" + str(self.map_encode()) + ">>"
+
+
+ def __setattr__(self, _name, _value):
+ # ignore private data members
+ if _name[0] == '_':
+ return super(QmfData, self).__setattr__(_name, _value)
+ if _name in self._values:
+ return self.set_value(_name, _value)
+ return super(QmfData, self).__setattr__(_name, _value)
+
+ def __getattr__(self, _name):
+ if _name != "_values" and _name in self._values:
+ return self._values[_name]
+ raise AttributeError("no value named '%s' in this object" % _name)
+
+ def __getitem__(self, _name):
+ return self.__getattr__(_name)
+
+ def __setitem__(self, _name, _value):
+ return self.__setattr__(_name, _value)
+
+
+
+class QmfEvent(QmfData):
+ """
+ A QMF Event is a type of described data that is not managed. Events are
+ notifications that are sent by Agents. An event notifies a Console of a
+ change in some aspect of the system under managment.
+ """
+ KEY_TIMESTAMP = "_timestamp"
+ KEY_SEVERITY = "_severity"
+
+ SEV_EMERG = "emerg"
+ SEV_ALERT = "alert"
+ SEV_CRIT = "crit"
+ SEV_ERR = "err"
+ SEV_WARNING = "warning"
+ SEV_NOTICE = "notice"
+ SEV_INFO = "info"
+ SEV_DEBUG = "debug"
+
+ def __init__(self, _timestamp=None, _sev=SEV_NOTICE, _values={},
+ _subtypes={}, _tag=None,
+ _map=None,
+ _schema=None, _const=True):
+ """
+ @type _map: dict
+ @param _map: if not None, construct instance from map representation.
+ @type _timestamp: int
+ @param _timestamp: moment in time when event occurred, expressed
+ as milliseconds since Midnight, Jan 1, 1970 UTC.
+ @type _agentId: class AgentId
+ @param _agentId: Identifies agent issuing this event.
+ @type _schema: class Schema
+ @param _schema:
+ @type _schemaId: class SchemaClassId (event)
+ @param _schemaId: identi
+ """
+
+ if _map is not None:
+ # construct from map
+ super(QmfEvent, self).__init__(_map=_map, _schema=_schema,
+ _const=_const)
+ _timestamp = _map.get(self.KEY_TIMESTAMP, _timestamp)
+ _sev = _map.get(self.KEY_SEVERITY, _sev)
+ else:
+ super(QmfEvent, self).__init__(_values=_values,
+ _subtypes=_subtypes, _tag=_tag,
+ _schema=_schema, _const=_const)
+ if _timestamp is None:
+ raise TypeError("QmfEvent: a valid timestamp is required.")
+
+ try:
+ self._timestamp = long(_timestamp)
+ except:
+ raise TypeError("QmfEvent: a numeric timestamp is required.")
+
+ self._severity = _sev
+
+ def _create(cls, timestamp, severity, values,
+ _subtypes={}, _tag=None, _schema=None, _const=False):
+ return cls(_timestamp=timestamp, _sev=severity, _values=values,
+ _subtypes=_subtypes, _tag=_tag, _schema=_schema, _const=_const)
+ create = classmethod(_create)
+
+ def _from_map(cls, map_, _schema=None, _const=False):
+ return cls(_map=map_, _schema=_schema, _const=_const)
+ from_map = classmethod(_from_map)
+
+ def get_timestamp(self):
+ return self._timestamp
+
+ def get_severity(self):
+ return self._severity
+
+ def map_encode(self):
+ _map = super(QmfEvent, self).map_encode()
+ _map[self.KEY_TIMESTAMP] = self._timestamp
+ _map[self.KEY_SEVERITY] = self._severity
+ return _map
+
+
+
+
+
+#==============================================================================
+#==============================================================================
+#==============================================================================
+
+
+
+
+class Arguments(object):
+ def __init__(self, map):
+ pass
+# self.map = map
+# self._by_hash = {}
+# key_count = self.map.keyCount()
+# a = 0
+# while a < key_count:
+# self._by_hash[self.map.key(a)] = self.by_key(self.map.key(a))
+# a += 1
+
+
+# def __getitem__(self, key):
+# return self._by_hash[key]
+
+
+# def __setitem__(self, key, value):
+# self._by_hash[key] = value
+# self.set(key, value)
+
+
+# def __iter__(self):
+# return self._by_hash.__iter__
+
+
+# def __getattr__(self, name):
+# if name in self._by_hash:
+# return self._by_hash[name]
+# return super.__getattr__(self, name)
+
+
+# def __setattr__(self, name, value):
+# #
+# # ignore local data members
+# #
+# if (name[0] == '_' or
+# name == 'map'):
+# return super.__setattr__(self, name, value)
+
+# if name in self._by_hash:
+# self._by_hash[name] = value
+# return self.set(name, value)
+
+# return super.__setattr__(self, name, value)
+
+
+# def by_key(self, key):
+# val = self.map.byKey(key)
+# vType = val.getType()
+# if vType == TYPE_UINT8: return val.asUint()
+# elif vType == TYPE_UINT16: return val.asUint()
+# elif vType == TYPE_UINT32: return val.asUint()
+# elif vType == TYPE_UINT64: return val.asUint64()
+# elif vType == TYPE_SSTR: return val.asString()
+# elif vType == TYPE_LSTR: return val.asString()
+# elif vType == TYPE_ABSTIME: return val.asInt64()
+# elif vType == TYPE_DELTATIME: return val.asUint64()
+# elif vType == TYPE_REF: return ObjectId(val.asObjectId())
+# elif vType == TYPE_BOOL: return val.asBool()
+# elif vType == TYPE_FLOAT: return val.asFloat()
+# elif vType == TYPE_DOUBLE: return val.asDouble()
+# elif vType == TYPE_UUID: return val.asUuid()
+# elif vType == TYPE_INT8: return val.asInt()
+# elif vType == TYPE_INT16: return val.asInt()
+# elif vType == TYPE_INT32: return val.asInt()
+# elif vType == TYPE_INT64: return val.asInt64()
+# else:
+# # when TYPE_MAP
+# # when TYPE_OBJECT
+# # when TYPE_LIST
+# # when TYPE_ARRAY
+# logging.error( "Unsupported Type for Get? '%s'" % str(val.getType()))
+# return None
+
+
+# def set(self, key, value):
+# val = self.map.byKey(key)
+# vType = val.getType()
+# if vType == TYPE_UINT8: return val.setUint(value)
+# elif vType == TYPE_UINT16: return val.setUint(value)
+# elif vType == TYPE_UINT32: return val.setUint(value)
+# elif vType == TYPE_UINT64: return val.setUint64(value)
+# elif vType == TYPE_SSTR:
+# if value:
+# return val.setString(value)
+# else:
+# return val.setString('')
+# elif vType == TYPE_LSTR:
+# if value:
+# return val.setString(value)
+# else:
+# return val.setString('')
+# elif vType == TYPE_ABSTIME: return val.setInt64(value)
+# elif vType == TYPE_DELTATIME: return val.setUint64(value)
+# elif vType == TYPE_REF: return val.setObjectId(value.impl)
+# elif vType == TYPE_BOOL: return val.setBool(value)
+# elif vType == TYPE_FLOAT: return val.setFloat(value)
+# elif vType == TYPE_DOUBLE: return val.setDouble(value)
+# elif vType == TYPE_UUID: return val.setUuid(value)
+# elif vType == TYPE_INT8: return val.setInt(value)
+# elif vType == TYPE_INT16: return val.setInt(value)
+# elif vType == TYPE_INT32: return val.setInt(value)
+# elif vType == TYPE_INT64: return val.setInt64(value)
+# else:
+# # when TYPE_MAP
+# # when TYPE_OBJECT
+# # when TYPE_LIST
+# # when TYPE_ARRAY
+# logging.error("Unsupported Type for Set? '%s'" % str(val.getType()))
+# return None
+
+
+
+#class MethodResponse(object):
+# def __init__(self, impl):
+# pass
+# self.impl = qmfengine.MethodResponse(impl)
+
+
+# def status(self):
+# return self.impl.getStatus()
+
+
+# def exception(self):
+# return self.impl.getException()
+
+
+# def text(self):
+# return exception().asString()
+
+
+# def args(self):
+# return Arguments(self.impl.getArgs())
+
+
+# def __getattr__(self, name):
+# myArgs = self.args()
+# return myArgs.__getattr__(name)
+
+
+# def __setattr__(self, name, value):
+# if name == 'impl':
+# return super.__setattr__(self, name, value)
+
+# myArgs = self.args()
+# return myArgs.__setattr__(name, value)
+
+
+
+# ##==============================================================================
+# ## QUERY
+# ##==============================================================================
+
+
+
+# def _doQuery(predicate, params ):
+# """
+# Given the predicate from a query, and a map of named parameters, apply the predicate
+# to the parameters, and return True or False.
+# """
+# if type(predicate) != list or len(predicate) < 1:
+# return False
+
+# elif opr == Query._LOGIC_AND:
+# logging.debug("_doQuery() AND: [%s]" % predicate )
+# rc = False
+# for exp in predicate[1:]:
+# rc = _doQuery( exp, params )
+# if not rc:
+# break
+# return rc
+
+# elif opr == Query._LOGIC_OR:
+# logging.debug("_doQuery() OR: [%s]" % predicate )
+# rc = False
+# for exp in predicate[1:]:
+# rc = _doQuery( exp, params )
+# if rc:
+# break
+# return rc
+
+# elif opr == Query._LOGIC_NOT:
+# logging.debug("_doQuery() NOT: [%s]" % predicate )
+# if len(predicate) != 2:
+# logging.warning("Malformed query not-expression received: '%s'" % predicate)
+# return False
+# return not _doQuery( predicate[1:], params )
+
+
+
+# else:
+# logging.warning("Unknown query operator received: '%s'" % opr)
+# return False
+
+
+
+class QmfQuery(_mapEncoder):
+
+ KEY_TARGET="what"
+ KEY_PREDICATE="where"
+ KEY_ID="id"
+
+ ### Query Types
+ ID=1
+ PREDICATE=2
+
+ #### Query Targets ####
+ TARGET_PACKAGES="schema_package"
+ # (returns just package names)
+ # allowed predicate key(s):
+ #
+ # SchemaClassId.KEY_PACKAGE
+
+ TARGET_SCHEMA_ID="schema_id"
+ TARGET_SCHEMA="schema"
+ # allowed predicate key(s):
+ #
+ # SchemaClassId.KEY_PACKAGE
+ # SchemaClassId.KEY_CLASS
+ # SchemaClassId.KEY_TYPE
+ # SchemaClassId.KEY_HASH
+ # SchemaClass.KEY_SCHEMA_ID
+ # name of property (exist test only)
+ # name of method (exist test only)
+
+ TARGET_AGENT="agent"
+ # allowed predicate keys(s):
+ #
+ KEY_AGENT_NAME="_name"
+
+ TARGET_OBJECT_ID="object_id"
+ TARGET_OBJECT="object"
+ # allowed predicate keys(s):
+ #
+ # SchemaClassId.KEY_PACKAGE
+ # SchemaClassId.KEY_CLASS
+ # SchemaClassId.KEY_TYPE
+ # SchemaClassId.KEY_HASH
+ # QmfData.KEY_SCHEMA_ID
+ # QmfData.KEY_OBJECT_ID
+ # QmfData.KEY_UPDATE_TS
+ # QmfData.KEY_CREATE_TS
+ # QmfData.KEY_DELETE_TS
+ # <name of data value>
+
+ CMP_EQ="eq"
+ CMP_NE="ne"
+ CMP_LT="lt"
+ CMP_LE="le"
+ CMP_GT="gt"
+ CMP_GE="ge"
+ CMP_RE_MATCH="re_match"
+ CMP_EXISTS="exists"
+ CMP_TRUE="true"
+ CMP_FALSE="false"
+
+ LOGIC_AND="and"
+ LOGIC_OR="or"
+ LOGIC_NOT="not"
+
+ _valid_targets = [TARGET_PACKAGES, TARGET_OBJECT_ID, TARGET_SCHEMA, TARGET_SCHEMA_ID,
+ TARGET_OBJECT, TARGET_AGENT]
+
+ def __init__(self, _target=None, _target_params=None, _predicate=None,
+ _id=None, _map=None):
+ """
+ """
+ if _map is not None:
+ target_map = _map.get(self.KEY_TARGET)
+ if not target_map:
+ raise TypeError("QmfQuery requires a target map")
+
+ _target = None
+ for key in target_map.iterkeys():
+ if key in self._valid_targets:
+ _target = key
+ break
+
+ _target_params = target_map.get(_target)
+
+ _id = _map.get(self.KEY_ID)
+ if _id is not None:
+ # Convert identifier to native type if necessary
+ if _target == self.TARGET_SCHEMA:
+ _id = SchemaClassId.from_map(_id)
+ else:
+ pred = _map.get(self.KEY_PREDICATE)
+ if pred:
+ _predicate = QmfQueryPredicate(pred)
+
+ self._target = _target
+ if not self._target:
+ raise TypeError("QmfQuery requires a target value")
+ self._target_params = _target_params
+ self._predicate = _predicate
+ self._id = _id
+
+ # constructors
+ def _create_wildcard(cls, target, _target_params=None):
+ return cls(_target=target, _target_params=_target_params)
+ create_wildcard = classmethod(_create_wildcard)
+
+ def _create_predicate(cls, target, predicate, _target_params=None):
+ return cls(_target=target, _target_params=_target_params,
+ _predicate=predicate)
+ create_predicate = classmethod(_create_predicate)
+
+ def _create_id(cls, target, ident, _target_params=None):
+ return cls(_target=target, _target_params=_target_params, _id=ident)
+ create_id = classmethod(_create_id)
+
+ def _from_map(cls, map_):
+ return cls(_map=map_)
+ from_map = classmethod(_from_map)
+
+ def get_target(self):
+ return self._target
+
+ def get_target_param(self):
+ return self._target_params
+
+ def get_selector(self):
+ if self._id:
+ return QmfQuery.ID
+ else:
+ return QmfQuery.PREDICATE
+
+ def get_id(self):
+ return self._id
+
+ def get_predicate(self):
+ """
+ """
+ return self._predicate
+
+ def evaluate(self, qmfData):
+ """
+ """
+ if self._id:
+ if self._target == self.TARGET_SCHEMA:
+ return (qmfData.has_value(qmfData.KEY_SCHEMA_ID) and
+ qmfData.get_value(qmfData.KEY_SCHEMA_ID) == self._id)
+ elif self._target == self.TARGET_OBJECT:
+ return (qmfData.has_value(qmfData.KEY_OBJECT_ID) and
+ qmfData.get_value(qmfData.KEY_OBJECT_ID) == self._id)
+ elif self._target == self.TARGET_AGENT:
+ return (qmfData.has_value(self.KEY_AGENT_NAME) and
+ qmfData.get_value(self.KEY_AGENT_NAME) == self._id)
+
+ raise Exception("Unsupported query target '%s'" % str(self._target))
+
+ if self._predicate:
+ return self._predicate.evaluate(qmfData)
+ # no predicate and no id - always match
+ return True
+
+ def map_encode(self):
+ _map = {self.KEY_TARGET: {self._target: self._target_params}}
+ if self._id is not None:
+ if isinstance(self._id, _mapEncoder):
+ _map[self.KEY_ID] = self._id.map_encode()
+ else:
+ _map[self.KEY_ID] = self._id
+ elif self._predicate is not None:
+ _map[self.KEY_PREDICATE] = self._predicate.map_encode()
+ return _map
+
+ def __repr__(self):
+ return "QmfQuery=<<" + str(self.map_encode()) + ">>"
+
+
+
+class QmfQueryPredicate(_mapEncoder):
+ """
+ Class for Query predicates.
+ """
+ _valid_cmp_ops = [QmfQuery.CMP_EQ, QmfQuery.CMP_NE, QmfQuery.CMP_LT,
+ QmfQuery.CMP_GT, QmfQuery.CMP_LE, QmfQuery.CMP_GE,
+ QmfQuery.CMP_EXISTS, QmfQuery.CMP_RE_MATCH,
+ QmfQuery.CMP_TRUE, QmfQuery.CMP_FALSE]
+ _valid_logic_ops = [QmfQuery.LOGIC_AND, QmfQuery.LOGIC_OR, QmfQuery.LOGIC_NOT]
+
+
+ def __init__( self, pmap):
+ """
+ {"op": listOf(operands)}
+ """
+ self._oper = None
+ self._operands = []
+
+ logic_op = False
+ if type(pmap) == dict:
+ for key in pmap.iterkeys():
+ if key in self._valid_cmp_ops:
+ # comparison operation - may have "name" and "value"
+ self._oper = key
+ break
+ if key in self._valid_logic_ops:
+ logic_op = True
+ self._oper = key
+ break
+
+ if not self._oper:
+ raise TypeError("invalid predicate expression: '%s'" % str(pmap))
+
+ if type(pmap[self._oper]) == list or type(pmap[self._oper]) == tuple:
+ if logic_op:
+ for exp in pmap[self._oper]:
+ self.append(QmfQueryPredicate(exp))
+ else:
+ self._operands = list(pmap[self._oper])
+
+ else:
+ raise TypeError("invalid predicate: '%s'" % str(pmap))
+
+
+ def append(self, operand):
+ """
+ Append another operand to a predicate expression
+ """
+ self._operands.append(operand)
+
+
+
+ def evaluate( self, qmfData ):
+ """
+ """
+ if not isinstance(qmfData, QmfData):
+ raise TypeError("Query expects to evaluate QmfData types.")
+
+ if self._oper == QmfQuery.CMP_TRUE:
+ logging.debug("query evaluate TRUE")
+ return True
+ if self._oper == QmfQuery.CMP_FALSE:
+ logging.debug("query evaluate FALSE")
+ return False
+
+ if self._oper in [QmfQuery.CMP_EQ, QmfQuery.CMP_NE, QmfQuery.CMP_LT,
+ QmfQuery.CMP_LE, QmfQuery.CMP_GT, QmfQuery.CMP_GE,
+ QmfQuery.CMP_RE_MATCH]:
+ if len(self._operands) != 2:
+ logging.warning("Malformed query compare expression received: '%s, %s'" %
+ (self._oper, str(self._operands)))
+ return False
+ # @todo: support regular expression match
+ name = self._operands[0]
+ logging.debug("looking for: '%s'" % str(name))
+ if not qmfData.has_value(name):
+ logging.warning("Malformed query, attribute '%s' not present."
+ % name)
+ return False
+
+ arg1 = qmfData.get_value(name)
+ arg2 = self._operands[1]
+ logging.debug("query evaluate %s: '%s' '%s' '%s'" %
+ (name, str(arg1), self._oper, str(arg2)))
+ try:
+ if self._oper == QmfQuery.CMP_EQ: return arg1 == arg2
+ if self._oper == QmfQuery.CMP_NE: return arg1 != arg2
+ if self._oper == QmfQuery.CMP_LT: return arg1 < arg2
+ if self._oper == QmfQuery.CMP_LE: return arg1 <= arg2
+ if self._oper == QmfQuery.CMP_GT: return arg1 > arg2
+ if self._oper == QmfQuery.CMP_GE: return arg1 >= arg2
+ if self._oper == QmfQuery.CMP_RE_MATCH:
+ logging.error("!!! RE QUERY TBD !!!")
+ return False
+ except:
+ pass
+ logging.warning("Malformed query - %s: '%s' '%s' '%s'" %
+ (name, str(arg1), self._oper, str(self._operands[1])))
+ return False
+
+
+ if self._oper == QmfQuery.CMP_EXISTS:
+ if len(self._operands) != 1:
+ logging.warning("Malformed query present expression received")
+ return False
+ name = self._operands[0]
+ logging.debug("query evaluate PRESENT: [%s]" % str(name))
+ return qmfData.has_value(name)
+
+ if self._oper == QmfQuery.LOGIC_AND:
+ logging.debug("query evaluate AND: '%s'" % str(self._operands))
+ for exp in self._operands:
+ if not exp.evaluate(qmfData):
+ return False
+ return True
+
+ if self._oper == QmfQuery.LOGIC_OR:
+ logging.debug("query evaluate OR: [%s]" % str(self._operands))
+ for exp in self._operands:
+ if exp.evaluate(qmfData):
+ return True
+ return False
+
+ if self._oper == QmfQuery.LOGIC_NOT:
+ logging.debug("query evaluate NOT: [%s]" % str(self._operands))
+ for exp in self._operands:
+ if exp.evaluate(qmfData):
+ return False
+ return True
+
+ logging.warning("Unrecognized query operator: [%s]" % str(self._oper))
+ return False
+
+
+ def map_encode(self):
+ _map = {}
+ _list = []
+ for exp in self._operands:
+ if isinstance(exp, QmfQueryPredicate):
+ _list.append(exp.map_encode())
+ else:
+ _list.append(exp)
+ _map[self._oper] = _list
+ return _map
+
+
+ def __repr__(self):
+ return "QmfQueryPredicate=<<" + str(self.map_encode()) + ">>"
+
+
+
+##==============================================================================
+## SCHEMA
+##==============================================================================
+
+
+# Argument typecodes, access, and direction qualifiers
+
+class qmfTypes(object):
+ TYPE_UINT8 = 1
+ TYPE_UINT16 = 2
+ TYPE_UINT32 = 3
+ TYPE_UINT64 = 4
+
+ TYPE_SSTR = 6
+ TYPE_LSTR = 7
+
+ TYPE_ABSTIME = 8
+ TYPE_DELTATIME = 9
+
+ TYPE_REF = 10
+
+ TYPE_BOOL = 11
+
+ TYPE_FLOAT = 12
+ TYPE_DOUBLE = 13
+
+ TYPE_UUID = 14
+
+ TYPE_MAP = 15
+
+ TYPE_INT8 = 16
+ TYPE_INT16 = 17
+ TYPE_INT32 = 18
+ TYPE_INT64 = 19
+
+ TYPE_OBJECT = 20
+
+ TYPE_LIST = 21
+
+ TYPE_ARRAY = 22
+
+# New subtypes:
+# integer (for time, duration, signed/unsigned)
+# double (float)
+# bool
+# string
+# map (ref, qmfdata)
+# list
+# uuid
+
+
+class qmfAccess(object):
+ READ_CREATE = 1
+ READ_WRITE = 2
+ READ_ONLY = 3
+
+
+class qmfDirection(object):
+ DIR_IN = 1
+ DIR_OUT = 2
+ DIR_IN_OUT = 3
+
+
+
+def _toBool( param ):
+ """
+ Helper routine to convert human-readable representations of
+ boolean values to python bool types.
+ """
+ _false_strings = ["off", "no", "false", "0", "none"]
+ _true_strings = ["on", "yes", "true", "1"]
+ if type(param) == str:
+ lparam = param.lower()
+ if lparam in _false_strings:
+ return False
+ if lparam in _true_strings:
+ return True
+ raise TypeError("unrecognized boolean string: '%s'" % param )
+ else:
+ return bool(param)
+
+
+
+class SchemaClassId(_mapEncoder):
+ """
+ Unique identifier for an instance of a SchemaClass.
+
+ Map format:
+ map["package_name"] = str, name of associated package
+ map["class_name"] = str, name of associated class
+ map["type"] = str, "data"|"event", default: "data"
+ optional:
+ map["hash_str"] = str, hash value in standard format or None
+ if hash is unknown.
+ """
+ KEY_PACKAGE="_package_name"
+ KEY_CLASS="_class_name"
+ KEY_TYPE="_type"
+ KEY_HASH="_hash_str"
+
+ TYPE_DATA = "_data"
+ TYPE_EVENT = "_event"
+
+ _valid_types=[TYPE_DATA, TYPE_EVENT]
+ _schemaHashStrFormat = "%08x-%08x-%08x-%08x"
+ _schemaHashStrDefault = "00000000-00000000-00000000-00000000"
+
+ def __init__(self, pname=None, cname=None, stype=TYPE_DATA, hstr=None,
+ _map=None):
+ """
+ @type pname: str
+ @param pname: the name of the class's package
+ @type cname: str
+ @param cname: name of the class
+ @type stype: str
+ @param stype: schema type [data | event]
+ @type hstr: str
+ @param hstr: the hash value in '%08x-%08x-%08x-%08x' format
+ """
+ if _map is not None:
+ # construct from map
+ pname = _map.get(self.KEY_PACKAGE, pname)
+ cname = _map.get(self.KEY_CLASS, cname)
+ stype = _map.get(self.KEY_TYPE, stype)
+ hstr = _map.get(self.KEY_HASH, hstr)
+
+ self._pname = pname
+ self._cname = cname
+ if stype not in SchemaClassId._valid_types:
+ raise TypeError("Invalid SchemaClassId type: '%s'" % stype)
+ self._type = stype
+ self._hstr = hstr
+ if self._hstr:
+ try:
+ # sanity check the format of the hash string
+ hexValues = hstr.split("-")
+ h0 = int(hexValues[0], 16)
+ h1 = int(hexValues[1], 16)
+ h2 = int(hexValues[2], 16)
+ h3 = int(hexValues[3], 16)
+ except:
+ raise Exception("Invalid SchemaClassId format: bad hash string: '%s':"
+ % hstr)
+ # constructor
+ def _create(cls, pname, cname, stype=TYPE_DATA, hstr=None):
+ return cls(pname=pname, cname=cname, stype=stype, hstr=hstr)
+ create = classmethod(_create)
+
+ # map constructor
+ def _from_map(cls, map_):
+ return cls(_map=map_)
+ from_map = classmethod(_from_map)
+
+ def get_package_name(self):
+ """
+ Access the package name in the SchemaClassId.
+
+ @rtype: str
+ """
+ return self._pname
+
+
+ def get_class_name(self):
+ """
+ Access the class name in the SchemaClassId
+
+ @rtype: str
+ """
+ return self._cname
+
+
+ def get_hash_string(self):
+ """
+ Access the schema's hash as a string value
+
+ @rtype: str
+ """
+ return self._hstr
+
+
+ def get_type(self):
+ """
+ Returns the type code associated with this Schema
+
+ @rtype: str
+ """
+ return self._type
+
+ def map_encode(self):
+ _map = {}
+ _map[self.KEY_PACKAGE] = self._pname
+ _map[self.KEY_CLASS] = self._cname
+ _map[self.KEY_TYPE] = self._type
+ if self._hstr: _map[self.KEY_HASH] = self._hstr
+ return _map
+
+ def __repr__(self):
+ hstr = self.get_hash_string()
+ if not hstr:
+ hstr = SchemaClassId._schemaHashStrDefault
+ return self._pname + ":" + self._cname + ":" + self._type + "(" + hstr + ")"
+
+
+ def __cmp__(self, other):
+ if isinstance(other, dict):
+ other = SchemaClassId.from_map(other)
+ if not isinstance(other, SchemaClassId):
+ raise TypeError("Invalid types for compare")
+ # return 1
+ me = str(self)
+ them = str(other)
+ if me < them:
+ return -1
+ if me > them:
+ return 1
+ return 0
+
+
+ def __hash__(self):
+ return (self._pname, self._cname, self._hstr).__hash__()
+
+
+
+class SchemaProperty(_mapEncoder):
+ """
+ Describes the structure of a Property data object.
+ Map format:
+ map["amqp_type"] = int, AMQP type code indicating property's data type
+
+ optional:
+ map["access"] = str, access allowed to this property, default "RO"
+ map["index"] = bool, True if this property is an index value, default False
+ map["optional"] = bool, True if this property is optional, default False
+ map["unit"] = str, describes units used
+ map["min"] = int, minimum allowed value
+ map["max"] = int, maximun allowed value
+ map["maxlen"] = int, if string type, this is the maximum length in bytes
+ required to represent the longest instance of this string.
+ map["desc"] = str, human-readable description of this argument
+ map["reference"] = str, ???
+ map["parent_ref"] = bool, true if this property references an object in
+ which this object is in a child-parent relationship. Default False
+ """
+ __hash__ = None
+ _access_strings = ["RO","RW","RC"]
+ _dir_strings = ["I", "O", "IO"]
+ def __init__(self, _type_code=None, _map=None, kwargs={}):
+ if _map is not None:
+ # construct from map
+ _type_code = _map.get("amqp_type", _type_code)
+ kwargs = _map
+ if not _type_code:
+ raise TypeError("SchemaProperty: amqp_type is a mandatory"
+ " parameter")
+
+ self._type = _type_code
+ self._access = "RO"
+ self._isIndex = False
+ self._isOptional = False
+ self._unit = None
+ self._min = None
+ self._max = None
+ self._maxlen = None
+ self._desc = None
+ self._reference = None
+ self._isParentRef = False
+ self._dir = None
+ self._default = None
+
+ for key, value in kwargs.items():
+ if key == "access":
+ value = str(value).upper()
+ if value not in self._access_strings:
+ raise TypeError("invalid value for access parameter: '%s':" % value )
+ self._access = value
+ elif key == "index" : self._isIndex = _toBool(value)
+ elif key == "optional": self._isOptional = _toBool(value)
+ elif key == "unit" : self._unit = value
+ elif key == "min" : self._min = value
+ elif key == "max" : self._max = value
+ elif key == "maxlen" : self._maxlen = value
+ elif key == "desc" : self._desc = value
+ elif key == "reference" : self._reference = value
+ elif key == "parent_ref" : self._isParentRef = _toBool(value)
+ elif key == "dir":
+ value = str(value).upper()
+ if value not in self._dir_strings:
+ raise TypeError("invalid value for direction parameter: '%s'" % value)
+ self._dir = value
+ elif key == "default" : self._default = value
+
+ # constructor
+ def _create(cls, type_code, kwargs={}):
+ return cls(_type_code=type_code, kwargs=kwargs)
+ create = classmethod(_create)
+
+ # map constructor
+ def _from_map(cls, map_):
+ return cls(_map=map_)
+ from_map = classmethod(_from_map)
+
+ def getType(self): return self._type
+
+ def getAccess(self): return self._access
+
+ def is_optional(self): return self._isOptional
+
+ def isIndex(self): return self._isIndex
+
+ def getUnit(self): return self._unit
+
+ def getMin(self): return self._min
+
+ def getMax(self): return self._max
+
+ def getMaxLen(self): return self._maxlen
+
+ def getDesc(self): return self._desc
+
+ def getReference(self): return self._reference
+
+ def isParentRef(self): return self._isParentRef
+
+ def get_direction(self): return self._dir
+
+ def get_default(self): return self._default
+
+ def map_encode(self):
+ """
+ Return the map encoding of this schema.
+ """
+ _map = {}
+ _map["amqp_type"] = self._type
+ _map["access"] = self._access
+ _map["index"] = self._isIndex
+ _map["optional"] = self._isOptional
+ if self._unit: _map["unit"] = self._unit
+ if self._min: _map["min"] = self._min
+ if self._max: _map["max"] = self._max
+ if self._maxlen: _map["maxlen"] = self._maxlen
+ if self._desc: _map["desc"] = self._desc
+ if self._reference: _map["reference"] = self._reference
+ _map["parent_ref"] = self._isParentRef
+ if self._dir: _map["dir"] = self._dir
+ if self._default: _map["default"] = self._default
+ return _map
+
+ def __repr__(self):
+ return "SchemaProperty=<<" + str(self.map_encode()) + ">>"
+
+ def _updateHash(self, hasher):
+ """
+ Update the given hash object with a hash computed over this schema.
+ """
+ hasher.update(str(self._type))
+ hasher.update(str(self._isIndex))
+ hasher.update(str(self._isOptional))
+ if self._access: hasher.update(self._access)
+ if self._unit: hasher.update(self._unit)
+ if self._desc: hasher.update(self._desc)
+ if self._dir: hasher.update(self._dir)
+ if self._default: hasher.update(self._default)
+
+
+
+class SchemaMethod(_mapEncoder):
+ """
+ The SchemaMethod class describes the method's structure, and contains a
+ SchemaProperty class for each argument declared by the method.
+
+ Map format:
+ map["arguments"] = map of "name"=<SchemaProperty> pairs.
+ map["desc"] = str, description of the method
+ """
+ KEY_NAME="_name"
+ KEY_ARGUMENTS="_arguments"
+ KEY_DESC="_desc"
+ KEY_ERROR="_error"
+ def __init__(self, _args={}, _desc=None, _map=None):
+ """
+ Construct a SchemaMethod.
+
+ @type args: map of "name"=<SchemaProperty> objects
+ @param args: describes the arguments accepted by the method
+ @type _desc: str
+ @param _desc: Human-readable description of the schema
+ """
+ if _map is not None:
+ _desc = _map.get(self.KEY_DESC)
+ margs = _map.get(self.KEY_ARGUMENTS)
+ if margs:
+ # margs are in map format - covert to SchemaProperty
+ tmp_args = {}
+ for name,val in margs.iteritems():
+ tmp_args[name] = SchemaProperty.from_map(val)
+ _args=tmp_args
+
+ self._arguments = _args.copy()
+ self._desc = _desc
+
+ # map constructor
+ def _from_map(cls, map_):
+ return cls(_map=map_)
+ from_map = classmethod(_from_map)
+
+ def get_desc(self): return self._desc
+
+ def get_arg_count(self): return len(self._arguments)
+
+ def get_arguments(self): return self._arguments.copy()
+
+ def get_argument(self, name): return self._arguments.get(name)
+
+ def add_argument(self, name, schema):
+ """
+ Add an argument to the list of arguments passed to this method.
+ Used by an agent for dynamically creating method schema.
+
+ @type name: string
+ @param name: name of new argument
+ @type schema: SchemaProperty
+ @param schema: SchemaProperty to add to this method
+ """
+ if not isinstance(schema, SchemaProperty):
+ raise TypeError("argument must be a SchemaProperty class")
+ # "Input" argument, by default
+ if schema._dir is None:
+ schema._dir = "I"
+ self._arguments[name] = schema
+
+ def map_encode(self):
+ """
+ Return the map encoding of this schema.
+ """
+ _map = {}
+ _args = {}
+ for name,val in self._arguments.iteritems():
+ _args[name] = val.map_encode()
+ _map[self.KEY_ARGUMENTS] = _args
+ if self._desc: _map[self.KEY_DESC] = self._desc
+ return _map
+
+ def __repr__(self):
+ result = "SchemaMethod=<<args=("
+ first = True
+ for name,arg in self._arguments.iteritems():
+ if first:
+ first = False
+ else:
+ result += ", "
+ result += name
+ result += ")>>"
+ return result
+
+ def _updateHash(self, hasher):
+ """
+ Update the given hash object with a hash computed over this schema.
+ """
+ for name,val in self._arguments.iteritems():
+ hasher.update(name)
+ val._updateHash(hasher)
+ if self._desc: hasher.update(self._desc)
+
+
+
+class SchemaClass(QmfData):
+ """
+ Base class for Data and Event Schema classes.
+
+ Map format:
+ map(QmfData), plus:
+ map["_schema_id"] = map representation of a SchemaClassId instance
+ map["_primary_key_names"] = order list of primary key names
+ """
+ KEY_PRIMARY_KEY_NAMES="_primary_key_names"
+ KEY_DESC = "_desc"
+
+ SUBTYPE_PROPERTY="qmfProperty"
+ SUBTYPE_METHOD="qmfMethod"
+
+ def __init__(self, _classId=None, _desc=None, _map=None):
+ """
+ Schema Class constructor.
+
+ @type classId: class SchemaClassId
+ @param classId: Identifier for this SchemaClass
+ @type _desc: str
+ @param _desc: Human-readable description of the schema
+ """
+ if _map is not None:
+ super(SchemaClass, self).__init__(_map=_map)
+
+ # decode each value based on its type
+ for name,value in self._values.iteritems():
+ if self._subtypes.get(name) == self.SUBTYPE_METHOD:
+ self._values[name] = SchemaMethod.from_map(value)
+ else:
+ self._values[name] = SchemaProperty.from_map(value)
+ cid = _map.get(self.KEY_SCHEMA_ID)
+ if cid:
+ _classId = SchemaClassId.from_map(cid)
+ self._object_id_names = _map.get(self.KEY_PRIMARY_KEY_NAMES,[])
+ _desc = _map.get(self.KEY_DESC)
+ else:
+ super(SchemaClass, self).__init__()
+ self._object_id_names = []
+
+ self._classId = _classId
+ self._desc = _desc
+
+ def get_class_id(self):
+ if not self._classId.get_hash_string():
+ self.generate_hash()
+ return self._classId
+
+ def get_desc(self): return self._desc
+
+ def generate_hash(self):
+ """
+ generate an md5 hash over the body of the schema,
+ and return a string representation of the hash
+ in format "%08x-%08x-%08x-%08x"
+ """
+ md5Hash = _md5Obj()
+ md5Hash.update(self._classId.get_package_name())
+ md5Hash.update(self._classId.get_class_name())
+ md5Hash.update(self._classId.get_type())
+ for name,x in self._values.iteritems():
+ md5Hash.update(name)
+ x._updateHash( md5Hash )
+ for name,value in self._subtypes.iteritems():
+ md5Hash.update(name)
+ md5Hash.update(value)
+ idx = 0
+ for name in self._object_id_names:
+ md5Hash.update(str(idx) + name)
+ idx += 1
+ hstr = md5Hash.hexdigest()[0:8] + "-" +\
+ md5Hash.hexdigest()[8:16] + "-" +\
+ md5Hash.hexdigest()[16:24] + "-" +\
+ md5Hash.hexdigest()[24:32]
+ # update classId with new hash value
+ self._classId._hstr = hstr
+ return hstr
+
+
+ def get_property_count(self):
+ count = 0
+ for value in self._subtypes.itervalues():
+ if value == self.SUBTYPE_PROPERTY:
+ count += 1
+ return count
+
+ def get_properties(self):
+ props = {}
+ for name,value in self._subtypes.iteritems():
+ if value == self.SUBTYPE_PROPERTY:
+ props[name] = self._values.get(name)
+ return props
+
+ def get_property(self, name):
+ if self._subtypes.get(name) == self.SUBTYPE_PROPERTY:
+ return self._values.get(name)
+ return None
+
+ def add_property(self, name, prop):
+ self.set_value(name, prop, self.SUBTYPE_PROPERTY)
+ # need to re-generate schema hash
+ self._classId._hstr = None
+
+ def get_value(self, name):
+ # check for meta-properties first
+ if name == SchemaClassId.KEY_PACKAGE:
+ return self._classId.get_package_name()
+ if name == SchemaClassId.KEY_CLASS:
+ return self._classId.get_class_name()
+ if name == SchemaClassId.KEY_TYPE:
+ return self._classId.get_type()
+ if name == SchemaClassId.KEY_HASH:
+ return self.get_class_id().get_hash_string()
+ if name == self.KEY_SCHEMA_ID:
+ return self.get_class_id()
+ if name == self.KEY_PRIMARY_KEY_NAMES:
+ return self._object_id_names[:]
+ return super(SchemaClass, self).get_value(name)
+
+ def has_value(self, name):
+ if name in [SchemaClassId.KEY_PACKAGE, SchemaClassId.KEY_CLASS, SchemaClassId.KEY_TYPE,
+ SchemaClassId.KEY_HASH, self.KEY_SCHEMA_ID, self.KEY_PRIMARY_KEY_NAMES]:
+ return True
+ super(SchemaClass, self).has_value(name)
+
+ def map_encode(self):
+ """
+ Return the map encoding of this schema.
+ """
+ _map = super(SchemaClass,self).map_encode()
+ _map[self.KEY_SCHEMA_ID] = self.get_class_id().map_encode()
+ if self._object_id_names:
+ _map[self.KEY_PRIMARY_KEY_NAMES] = self._object_id_names[:]
+ if self._desc:
+ _map[self.KEY_DESC] = self._desc
+ return _map
+
+ def __repr__(self):
+ return str(self.get_class_id())
+
+
+
+class SchemaObjectClass(SchemaClass):
+ """
+ A schema class that describes a data object. The data object is composed
+ of zero or more properties and methods. An instance of the SchemaObjectClass
+ can be identified using a key generated by concantenating the values of
+ all properties named in the primary key list.
+
+ Map format:
+ map(SchemaClass)
+ """
+ def __init__(self, _classId=None, _desc=None,
+ _props={}, _methods={}, _object_id_names=None,
+ _map=None):
+ """
+ @type pname: str
+ @param pname: name of package this schema belongs to
+ @type cname: str
+ @param cname: class name for this schema
+ @type desc: str
+ @param desc: Human-readable description of the schema
+ @type _hash: str
+ @param _methods: hash computed on the body of this schema, if known
+ @type _props: map of 'name':<SchemaProperty> objects
+ @param _props: all properties provided by this schema
+ @type _pkey: list of strings
+ @param _pkey: names of each property to be used for constructing the primary key
+ @type _methods: map of 'name':<SchemaMethod> objects
+ @param _methods: all methods provided by this schema
+ """
+ if _map is not None:
+ super(SchemaObjectClass,self).__init__(_map=_map)
+ else:
+ super(SchemaObjectClass, self).__init__(_classId=_classId, _desc=_desc)
+ self._object_id_names = _object_id_names
+ for name,value in _props.iteritems():
+ self.set_value(name, value, self.SUBTYPE_PROPERTY)
+ for name,value in _methods.iteritems():
+ self.set_value(name, value, self.SUBTYPE_METHOD)
+
+ if self._classId.get_type() != SchemaClassId.TYPE_DATA:
+ raise TypeError("Invalid ClassId type for data schema: %s" % self._classId)
+
+ # map constructor
+ def __from_map(cls, map_):
+ return cls(_map=map_)
+ from_map = classmethod(__from_map)
+
+ def get_id_names(self):
+ return self._object_id_names[:]
+
+ def get_method_count(self):
+ count = 0
+ for value in self._subtypes.itervalues():
+ if value == self.SUBTYPE_METHOD:
+ count += 1
+ return count
+
+ def get_methods(self):
+ meths = {}
+ for name,value in self._subtypes.iteritems():
+ if value == self.SUBTYPE_METHOD:
+ meths[name] = self._values.get(name)
+ return meths
+
+ def get_method(self, name):
+ if self._subtypes.get(name) == self.SUBTYPE_METHOD:
+ return self._values.get(name)
+ return None
+
+ def add_method(self, name, method):
+ self.set_value(name, method, self.SUBTYPE_METHOD)
+ # need to re-generate schema hash
+ self._classId._hstr = None
+
+
+
+
+class SchemaEventClass(SchemaClass):
+ """
+ A schema class that describes an event. The event is composed
+ of zero or more properties.
+
+ Map format:
+ map["schema_id"] = map, SchemaClassId map for this object.
+ map["desc"] = string description of this schema
+ map["properties"] = map of "name":SchemaProperty values.
+ """
+ def __init__(self, _classId=None, _desc=None, _props={},
+ _map=None):
+ if _map is not None:
+ super(SchemaEventClass,self).__init__(_map=_map)
+ else:
+ super(SchemaEventClass, self).__init__(_classId=_classId,
+ _desc=_desc)
+ for name,value in _props.iteritems():
+ self.set_value(name, value, self.SUBTYPE_PROPERTY)
+
+ if self._classId.get_type() != SchemaClassId.TYPE_EVENT:
+ raise TypeError("Invalid ClassId type for event schema: %s" %
+ self._classId)
+
+ # map constructor
+ def __from_map(cls, map_):
+ return cls(_map=map_)
+ from_map = classmethod(__from_map)
+
+
+
+
+
+
+
+
+
+
+
diff --git a/qpid/python/qmf2/console.py b/qpid/python/qmf2/console.py
new file mode 100644
index 0000000000..8e8f4799f7
--- /dev/null
+++ b/qpid/python/qmf2/console.py
@@ -0,0 +1,1959 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import sys
+import os
+import logging
+import platform
+import time
+import datetime
+import Queue
+from threading import Thread
+from threading import Lock
+from threading import currentThread
+from threading import Condition
+
+from qpid.messaging import Connection, Message, Empty, SendError
+
+from common import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier,
+ QmfQueryPredicate, MsgKey, QmfData, QmfAddress,
+ AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
+ SchemaClass, SchemaClassId, SchemaEventClass,
+ SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent)
+
+
+
+# global flag that indicates which thread (if any) is
+# running the console notifier callback
+_callback_thread=None
+
+
+
+
+##==============================================================================
+## Sequence Manager
+##==============================================================================
+
+class _Mailbox(object):
+ """
+ Virtual base class for all Mailbox-like objects
+ """
+ def __init__(self):
+ self._msgs = []
+ self._cv = Condition()
+ self._waiting = False
+
+ def deliver(self, obj):
+ self._cv.acquire()
+ try:
+ self._msgs.append(obj)
+ # if was empty, notify waiters
+ if len(self._msgs) == 1:
+ self._cv.notify()
+ finally:
+ self._cv.release()
+
+ def fetch(self, timeout=None):
+ self._cv.acquire()
+ try:
+ if len(self._msgs) == 0:
+ self._cv.wait(timeout)
+ if len(self._msgs):
+ return self._msgs.pop()
+ return None
+ finally:
+ self._cv.release()
+
+
+
+class SequencedWaiter(object):
+ """
+ Manage sequence numbers for asynchronous method calls.
+ Allows the caller to associate a generic piece of data with a unique sequence
+ number."""
+
+ def __init__(self):
+ self.lock = Lock()
+ self.sequence = long(time.time()) # pseudo-randomize seq start
+ self.pending = {}
+
+
+ def allocate(self):
+ """
+ Reserve a sequence number.
+
+ @rtype: long
+ @return: a unique nonzero sequence number.
+ """
+ self.lock.acquire()
+ try:
+ seq = self.sequence
+ self.sequence = self.sequence + 1
+ self.pending[seq] = _Mailbox()
+ finally:
+ self.lock.release()
+ logging.debug( "sequence %d allocated" % seq)
+ return seq
+
+
+ def put_data(self, seq, new_data):
+ seq = long(seq)
+ logging.debug( "putting data [%r] to seq %d..." % (new_data, seq) )
+ self.lock.acquire()
+ try:
+ if seq in self.pending:
+ # logging.error("Putting seq %d @ %s" % (seq,time.time()))
+ self.pending[seq].deliver(new_data)
+ else:
+ logging.error( "seq %d not found!" % seq )
+ finally:
+ self.lock.release()
+
+
+
+ def get_data(self, seq, timeout=None):
+ """
+ Release a sequence number reserved using the reserve method. This must
+ be called when the sequence is no longer needed.
+
+ @type seq: int
+ @param seq: a sequence previously allocated by calling reserve().
+ @rtype: any
+ @return: the data originally associated with the reserved sequence number.
+ """
+ seq = long(seq)
+ logging.debug( "getting data for seq=%d" % seq)
+ mbox = None
+ self.lock.acquire()
+ try:
+ if seq in self.pending:
+ mbox = self.pending[seq]
+ finally:
+ self.lock.release()
+
+ # Note well: pending list is unlocked, so we can wait.
+ # we reference mbox locally, so it will not be released
+ # until we are done.
+
+ if mbox:
+ d = mbox.fetch(timeout)
+ logging.debug( "seq %d fetched %r!" % (seq, d) )
+ return d
+
+ logging.debug( "seq %d not found!" % seq )
+ return None
+
+
+ def release(self, seq):
+ """
+ Release the sequence, and its mailbox
+ """
+ seq = long(seq)
+ logging.debug( "releasing seq %d" % seq )
+ self.lock.acquire()
+ try:
+ if seq in self.pending:
+ del self.pending[seq]
+ finally:
+ self.lock.release()
+
+
+ def isValid(self, seq):
+ """
+ True if seq is in use, else False (seq is unknown)
+ """
+ seq = long(seq)
+ self.lock.acquire()
+ try:
+ return seq in self.pending
+ finally:
+ self.lock.release()
+ return False
+
+
+##==============================================================================
+## DATA MODEL
+##==============================================================================
+
+
+class QmfConsoleData(QmfData):
+ """
+ Console's representation of an managed QmfData instance.
+ """
+ def __init__(self, map_, agent, _schema=None):
+ super(QmfConsoleData, self).__init__(_map=map_,
+ _schema=_schema,
+ _const=True)
+ self._agent = agent
+
+ def get_timestamps(self):
+ """
+ Returns a list of timestamps describing the lifecycle of
+ the object. All timestamps are represented by the AMQP
+ timestamp type. [0] = time of last update from Agent,
+ [1] = creation timestamp
+ [2] = deletion timestamp, or zero if not
+ deleted.
+ """
+ return [self._utime, self._ctime, self._dtime]
+
+ def get_create_time(self):
+ """
+ returns the creation timestamp
+ """
+ return self._ctime
+
+ def get_update_time(self):
+ """
+ returns the update timestamp
+ """
+ return self._utime
+
+ def get_delete_time(self):
+ """
+ returns the deletion timestamp, or zero if not yet deleted.
+ """
+ return self._dtime
+
+ def is_deleted(self):
+ """
+ True if deletion timestamp not zero.
+ """
+ return self._dtime != long(0)
+
+ def refresh(self, _reply_handle=None, _timeout=None):
+ """
+ request that the Agent update the value of this object's
+ contents.
+ """
+ if _reply_handle is not None:
+ logging.error(" ASYNC REFRESH TBD!!!")
+ return None
+
+ assert self._agent
+ assert self._agent._console
+
+ if _timeout is None:
+ _timeout = self._agent._console._reply_timeout
+
+
+ # create query to agent using this objects ID
+ oid = self.get_object_id()
+ query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT,
+ self.get_object_id())
+ obj_list = self._agent._console.doQuery(self._agent, query,
+ timeout=_timeout)
+ if obj_list is None or len(obj_list) != 1:
+ return None
+
+ self._update(obj_list[0])
+ return self
+
+
+ def invoke_method(self, name, _in_args={}, _reply_handle=None,
+ _timeout=None):
+ """
+ invoke the named method.
+ """
+ assert self._agent
+ assert self._agent._console
+
+ oid = self.get_object_id()
+ if oid is None:
+ raise ValueError("Cannot invoke methods on unmanaged objects.")
+
+ if _timeout is None:
+ _timeout = self._agent._console._reply_timeout
+
+ if _in_args:
+ _in_args = _in_args.copy()
+
+ if self._schema:
+ # validate
+ ms = self._schema.get_method(name)
+ if ms is None:
+ raise ValueError("Method '%s' is undefined." % name)
+
+ for aname,prop in ms.get_arguments().iteritems():
+ if aname not in _in_args:
+ if prop.get_default():
+ _in_args[aname] = prop.get_default()
+ elif not prop.is_optional():
+ raise ValueError("Method '%s' requires argument '%s'"
+ % (name, aname))
+ for aname in _in_args.iterkeys():
+ prop = ms.get_argument(aname)
+ if prop is None:
+ raise ValueError("Method '%s' does not define argument"
+ " '%s'" % (name, aname))
+ if "I" not in prop.get_direction():
+ raise ValueError("Method '%s' argument '%s' is not an"
+ " input." % (name, aname))
+
+ # @todo check if value is correct (type, range, etc)
+
+ handle = self._agent._console._req_correlation.allocate()
+ if handle == 0:
+ raise Exception("Can not allocate a correlation id!")
+
+ _map = {self.KEY_OBJECT_ID:str(oid),
+ SchemaMethod.KEY_NAME:name}
+ if _in_args:
+ _map[SchemaMethod.KEY_ARGUMENTS] = _in_args
+
+ logging.debug("Sending method req to Agent (%s)" % time.time())
+ try:
+ self._agent._sendMethodReq(_map, handle)
+ except SendError, e:
+ logging.error(str(e))
+ self._agent._console._req_correlation.release(handle)
+ return None
+
+ # @todo async method calls!!!
+ if _reply_handle is not None:
+ print("ASYNC TBD")
+
+ logging.debug("Waiting for response to method req (%s)" % _timeout)
+ replyMsg = self._agent._console._req_correlation.get_data(handle, _timeout)
+ self._agent._console._req_correlation.release(handle)
+ if not replyMsg:
+ logging.debug("Agent method req wait timed-out.")
+ return None
+
+ _map = replyMsg.content.get(MsgKey.method)
+ if not _map:
+ logging.error("Invalid method call reply message")
+ return None
+
+ error=_map.get(SchemaMethod.KEY_ERROR)
+ if error:
+ return MethodResult(_error=QmfData.from_map(error))
+ else:
+ return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS))
+
+ def _update(self, newer):
+ super(QmfConsoleData,self).__init__(_values=newer._values, _subtypes=newer._subtypes,
+ _tag=newer._tag, _object_id=newer._object_id,
+ _ctime=newer._ctime, _utime=newer._utime,
+ _dtime=newer._dtime,
+ _schema=newer._schema, _const=True)
+
+class QmfLocalData(QmfData):
+ """
+ Console's representation of an unmanaged QmfData instance. There
+ is no remote agent associated with this instance. The Console has
+ full control over this instance.
+ """
+ def __init__(self, values, _subtypes={}, _tag=None, _object_id=None,
+ _schema=None):
+ # timestamp in millisec since epoch UTC
+ ctime = long(time.time() * 1000)
+ super(QmfLocalData, self).__init__(_values=values,
+ _subtypes=_subtypes, _tag=_tag,
+ _object_id=_object_id,
+ _schema=_schema, _ctime=ctime,
+ _utime=ctime, _const=False)
+
+
+class Agent(object):
+ """
+ A local representation of a remote agent managed by this console.
+ """
+ def __init__(self, name, console):
+ """
+ @type name: AgentId
+ @param name: uniquely identifies this agent in the AMQP domain.
+ """
+
+ if not isinstance(console, Console):
+ raise TypeError("parameter must be an instance of class Console")
+
+ self._name = name
+ self._address = QmfAddress.direct(name, console._domain)
+ self._console = console
+ self._sender = None
+ self._packages = {} # map of {package-name:[list of class-names], } for this agent
+ self._subscriptions = [] # list of active standing subscriptions for this agent
+ self._announce_timestamp = None # datetime when last announce received
+ logging.debug( "Created Agent with address: [%s]" % self._address )
+
+
+ def get_name(self):
+ return self._name
+
+ def isActive(self):
+ return self._announce_timestamp != None
+
+ def _sendMsg(self, msg, correlation_id=None):
+ """
+ Low-level routine to asynchronously send a message to this agent.
+ """
+ msg.reply_to = str(self._console._address)
+ # handle = self._console._req_correlation.allocate()
+ # if handle == 0:
+ # raise Exception("Can not allocate a correlation id!")
+ # msg.correlation_id = str(handle)
+ if correlation_id:
+ msg.correlation_id = str(correlation_id)
+ self._sender.send(msg)
+ # return handle
+
+ def get_packages(self):
+ """
+ Return a list of the names of all packages known to this agent.
+ """
+ return self._packages.keys()
+
+ def get_classes(self):
+ """
+ Return a dictionary [key:class] of classes known to this agent.
+ """
+ return self._packages.copy()
+
+ def get_objects(self, query, kwargs={}):
+ """
+ Return a list of objects that satisfy the given query.
+
+ @type query: dict, or common.Query
+ @param query: filter for requested objects
+ @type kwargs: dict
+ @param kwargs: ??? used to build match selector and query ???
+ @rtype: list
+ @return: list of matching objects, or None.
+ """
+ pass
+
+ def get_object(self, query, kwargs={}):
+ """
+ Get one object - query is expected to match only one object.
+ ??? Recommended: explicit timeout param, default None ???
+
+ @type query: dict, or common.Query
+ @param query: filter for requested objects
+ @type kwargs: dict
+ @param kwargs: ??? used to build match selector and query ???
+ @rtype: qmfConsole.ObjectProxy
+ @return: one matching object, or none
+ """
+ pass
+
+
+ def create_subscription(self, query):
+ """
+ Factory for creating standing subscriptions based on a given query.
+
+ @type query: common.Query object
+ @param query: determines the list of objects for which this subscription applies
+ @rtype: qmfConsole.Subscription
+ @returns: an object representing the standing subscription.
+ """
+ pass
+
+
+ def invoke_method(self, name, _in_args={}, _reply_handle=None,
+ _timeout=None):
+ """
+ """
+ assert self._console
+
+ if _timeout is None:
+ _timeout = self._console._reply_timeout
+
+ if _in_args:
+ _in_args = _in_args.copy()
+
+ handle = self._console._req_correlation.allocate()
+ if handle == 0:
+ raise Exception("Can not allocate a correlation id!")
+
+ _map = {SchemaMethod.KEY_NAME:name}
+ if _in_args:
+ _map[SchemaMethod.KEY_ARGUMENTS] = _in_args
+
+ logging.debug("Sending method req to Agent (%s)" % time.time())
+ try:
+ self._sendMethodReq(_map, handle)
+ except SendError, e:
+ logging.error(str(e))
+ self._console._req_correlation.release(handle)
+ return None
+
+ # @todo async method calls!!!
+ if _reply_handle is not None:
+ print("ASYNC TBD")
+
+ logging.debug("Waiting for response to method req (%s)" % _timeout)
+ replyMsg = self._console._req_correlation.get_data(handle, _timeout)
+ self._console._req_correlation.release(handle)
+ if not replyMsg:
+ logging.debug("Agent method req wait timed-out.")
+ return None
+
+ _map = replyMsg.content.get(MsgKey.method)
+ if not _map:
+ logging.error("Invalid method call reply message")
+ return None
+
+ return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS),
+ _error=_map.get(SchemaMethod.KEY_ERROR))
+
+ def enable_events(self):
+ raise Exception("enable_events tbd")
+
+ def disable_events(self):
+ raise Exception("disable_events tbd")
+
+ def destroy(self):
+ raise Exception("destroy tbd")
+
+ def __repr__(self):
+ return str(self._address)
+
+ def __str__(self):
+ return self.__repr__()
+
+ def _sendQuery(self, query, correlation_id=None):
+ """
+ """
+ msg = Message(subject=makeSubject(OpCode.get_query),
+ properties={"method":"request"},
+ content={MsgKey.query: query.map_encode()})
+ self._sendMsg( msg, correlation_id )
+
+
+ def _sendMethodReq(self, mr_map, correlation_id=None):
+ """
+ """
+ msg = Message(subject=makeSubject(OpCode.method_req),
+ properties={"method":"request"},
+ content=mr_map)
+ self._sendMsg( msg, correlation_id )
+
+
+ ##==============================================================================
+ ## METHOD CALL
+ ##==============================================================================
+
+class MethodResult(object):
+ def __init__(self, _out_args=None, _error=None):
+ self._error = _error
+ self._out_args = _out_args
+
+ def succeeded(self):
+ return self._error is None
+
+ def get_exception(self):
+ return self._error
+
+ def get_arguments(self):
+ return self._out_args
+
+ def get_argument(self, name):
+ arg = None
+ if self._out_args:
+ arg = self._out_args.get(name)
+ return arg
+
+
+ ##==============================================================================
+ ## CONSOLE
+ ##==============================================================================
+
+
+
+
+
+
+class Console(Thread):
+ """
+ A Console manages communications to a collection of agents on behalf of an application.
+ """
+ def __init__(self, name=None, _domain=None, notifier=None,
+ reply_timeout = 60,
+ # agent_timeout = 120,
+ agent_timeout = 60,
+ kwargs={}):
+ """
+ @type name: str
+ @param name: identifier for this console. Must be unique.
+ @type notifier: qmfConsole.Notifier
+ @param notifier: invoked when events arrive for processing.
+ @type kwargs: dict
+ @param kwargs: ??? Unused
+ """
+ Thread.__init__(self)
+ if not name:
+ self._name = "qmfc-%s.%d" % (platform.node(), os.getpid())
+ else:
+ self._name = str(name)
+ self._domain = _domain
+ self._address = QmfAddress.direct(self._name, self._domain)
+ self._notifier = notifier
+ self._lock = Lock()
+ self._conn = None
+ self._session = None
+ # dict of "agent-direct-address":class Agent entries
+ self._agent_map = {}
+ self._direct_recvr = None
+ self._announce_recvr = None
+ self._locate_sender = None
+ self._schema_cache = {}
+ self._req_correlation = SequencedWaiter()
+ self._operational = False
+ self._agent_discovery_filter = None
+ self._reply_timeout = reply_timeout
+ self._agent_timeout = agent_timeout
+ self._next_agent_expire = None
+ # lock out run() thread
+ self._cv = Condition()
+ # for passing WorkItems to the application
+ self._work_q = Queue.Queue()
+ self._work_q_put = False
+ ## Old stuff below???
+ #self._broker_list = []
+ #self.impl = qmfengine.Console()
+ #self._event = qmfengine.ConsoleEvent()
+ ##self._cv = Condition()
+ ##self._sync_count = 0
+ ##self._sync_result = None
+ ##self._select = {}
+ ##self._cb_cond = Condition()
+
+
+
+ def destroy(self, timeout=None):
+ """
+ Must be called before the Console is deleted.
+ Frees up all resources and shuts down all background threads.
+
+ @type timeout: float
+ @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever.
+ """
+ logging.debug("Destroying Console...")
+ if self._conn:
+ self.removeConnection(self._conn, timeout)
+ logging.debug("Console Destroyed")
+
+
+
+ def addConnection(self, conn):
+ """
+ Add a AMQP connection to the console. The console will setup a session over the
+ connection. The console will then broadcast an Agent Locate Indication over
+ the session in order to discover present agents.
+
+ @type conn: qpid.messaging.Connection
+ @param conn: the connection to the AMQP messaging infrastructure.
+ """
+ if self._conn:
+ raise Exception( "Multiple connections per Console not supported." );
+ self._conn = conn
+ self._session = conn.session(name=self._name)
+ self._direct_recvr = self._session.receiver(str(self._address) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}",
+ capacity=1)
+ logging.debug("my direct addr=%s" % self._direct_recvr.source)
+
+ ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
+ self._announce_recvr = self._session.receiver(str(ind_addr) +
+ ";{create:always,"
+ " node-properties:{type:topic}}",
+ capacity=1)
+ logging.debug("agent.ind addr=%s" % self._announce_recvr.source)
+
+ locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
+ self._locate_sender = self._session.sender(str(locate_addr) +
+ ";{create:always,"
+ " node-properties:{type:topic}}")
+ logging.debug("agent.locate addr=%s" % self._locate_sender.target)
+
+ #
+ # Now that receivers are created, fire off the receive thread...
+ #
+ self._operational = True
+ self.start()
+
+
+
+ def removeConnection(self, conn, timeout=None):
+ """
+ Remove an AMQP connection from the console. Un-does the add_connection() operation,
+ and releases any agents and sessions associated with the connection.
+
+ @type conn: qpid.messaging.Connection
+ @param conn: connection previously added by add_connection()
+ """
+ if self._conn and conn and conn != self._conn:
+ logging.error( "Attempt to delete unknown connection: %s" % str(conn))
+
+ # tell connection thread to shutdown
+ self._operational = False
+ if self.isAlive():
+ # kick my thread to wake it up
+ logging.debug("Making temp sender for [%s]" % self._address)
+ tmp_sender = self._session.sender(str(self._address))
+ try:
+ msg = Message(subject=makeSubject(OpCode.noop))
+ tmp_sender.send( msg, sync=True )
+ except SendError, e:
+ logging.error(str(e))
+ logging.debug("waiting for console receiver thread to exit")
+ self.join(timeout)
+ if self.isAlive():
+ logging.error( "Console thread '%s' is hung..." % self.getName() )
+ self._direct_recvr.close()
+ self._announce_recvr.close()
+ self._locate_sender.close()
+ self._session.close()
+ self._session = None
+ self._conn = None
+ logging.debug("console connection removal complete")
+
+
+ def getAddress(self):
+ """
+ The AMQP address this Console is listening to.
+ """
+ return self._address
+
+
+ def destroyAgent( self, agent ):
+ """
+ Undoes create.
+ """
+ if not isinstance(agent, Agent):
+ raise TypeError("agent must be an instance of class Agent")
+
+ self._lock.acquire()
+ try:
+ if agent._id in self._agent_map:
+ del self._agent_map[agent._id]
+ finally:
+ self._lock.release()
+
+ def find_agent(self, name, timeout=None ):
+ """
+ Given the name of a particular agent, return an instance of class Agent
+ representing that agent. Return None if the agent does not exist.
+ """
+
+ self._lock.acquire()
+ try:
+ agent = self._agent_map.get(name)
+ if agent:
+ return agent
+ finally:
+ self._lock.release()
+
+ # agent not present yet - ping it with an agent_locate
+
+ handle = self._req_correlation.allocate()
+ if handle == 0:
+ raise Exception("Can not allocate a correlation id!")
+ try:
+ tmp_sender = self._session.sender(str(QmfAddress.direct(name,
+ self._domain))
+ + ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}")
+
+ query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
+ msg = Message(subject=makeSubject(OpCode.agent_locate),
+ properties={"method":"request"},
+ content={MsgKey.query: query.map_encode()})
+ msg.reply_to = str(self._address)
+ msg.correlation_id = str(handle)
+ logging.debug("Sending Agent Locate (%s)" % time.time())
+ tmp_sender.send( msg )
+ except SendError, e:
+ logging.error(str(e))
+ self._req_correlation.release(handle)
+ return None
+
+ if timeout is None:
+ timeout = self._reply_timeout
+
+ new_agent = None
+ logging.debug("Waiting for response to Agent Locate (%s)" % timeout)
+ self._req_correlation.get_data( handle, timeout )
+ self._req_correlation.release(handle)
+ logging.debug("Agent Locate wait ended (%s)" % time.time())
+ self._lock.acquire()
+ try:
+ new_agent = self._agent_map.get(name)
+ finally:
+ self._lock.release()
+ return new_agent
+
+
+ def doQuery(self, agent, query, timeout=None ):
+ """
+ """
+
+ target = query.get_target()
+ handle = self._req_correlation.allocate()
+ if handle == 0:
+ raise Exception("Can not allocate a correlation id!")
+ try:
+ logging.debug("Sending Query to Agent (%s)" % time.time())
+ agent._sendQuery(query, handle)
+ except SendError, e:
+ logging.error(str(e))
+ self._req_correlation.release(handle)
+ return None
+
+ if not timeout:
+ timeout = self._reply_timeout
+
+ logging.debug("Waiting for response to Query (%s)" % timeout)
+ reply = self._req_correlation.get_data(handle, timeout)
+ self._req_correlation.release(handle)
+ if not reply:
+ logging.debug("Agent Query wait timed-out.")
+ return None
+
+ if target == QmfQuery.TARGET_PACKAGES:
+ # simply pass back the list of package names
+ logging.debug("Response to Packet Query received")
+ return reply.content.get(MsgKey.package_info)
+ elif target == QmfQuery.TARGET_OBJECT_ID:
+ # simply pass back the list of object_id's
+ logging.debug("Response to Object Id Query received")
+ return reply.content.get(MsgKey.object_id)
+ elif target == QmfQuery.TARGET_SCHEMA_ID:
+ logging.debug("Response to Schema Id Query received")
+ id_list = []
+ for sid_map in reply.content.get(MsgKey.schema_id):
+ id_list.append(SchemaClassId.from_map(sid_map))
+ return id_list
+ elif target == QmfQuery.TARGET_SCHEMA:
+ logging.debug("Response to Schema Query received")
+ schema_list = []
+ for schema_map in reply.content.get(MsgKey.schema):
+ # extract schema id, convert based on schema type
+ sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
+ if sid_map:
+ sid = SchemaClassId.from_map(sid_map)
+ if sid:
+ if sid.get_type() == SchemaClassId.TYPE_DATA:
+ schema = SchemaObjectClass.from_map(schema_map)
+ else:
+ schema = SchemaEventClass.from_map(schema_map)
+ schema_list.append(schema)
+ self._add_schema(schema)
+ return schema_list
+ elif target == QmfQuery.TARGET_OBJECT:
+ logging.debug("Response to Object Query received")
+ obj_list = []
+ for obj_map in reply.content.get(MsgKey.data_obj):
+ # if the object references a schema, fetch it
+ sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
+ if sid_map:
+ sid = SchemaClassId.from_map(sid_map)
+ schema = self._fetch_schema(sid, _agent=agent,
+ _timeout=timeout)
+ if not schema:
+ logging.warning("Unknown schema, id=%s" % sid)
+ continue
+ obj = QmfConsoleData(map_=obj_map, agent=agent,
+ _schema=schema)
+ else:
+ # no schema needed
+ obj = QmfConsoleData(map_=obj_map, agent=agent)
+ obj_list.append(obj)
+ return obj_list
+ else:
+ logging.warning("Unexpected Target for a Query: '%s'" % target)
+ return None
+
+ def run(self):
+ global _callback_thread
+ #
+ # @todo KAG Rewrite when api supports waiting on multiple receivers
+ #
+ while self._operational:
+
+ # qLen = self._work_q.qsize()
+
+ while True:
+ try:
+ msg = self._announce_recvr.fetch(timeout=0)
+ except Empty:
+ break
+ self._dispatch(msg, _direct=False)
+
+ while True:
+ try:
+ msg = self._direct_recvr.fetch(timeout = 0)
+ except Empty:
+ break
+ self._dispatch(msg, _direct=True)
+
+ for agent in self._agent_map.itervalues():
+ try:
+ msg = agent._event_recvr.fetch(timeout = 0)
+ except Empty:
+ continue
+ self._dispatch(msg, _direct=False)
+
+
+ self._expireAgents() # check for expired agents
+
+ #if qLen == 0 and self._work_q.qsize() and self._notifier:
+ if self._work_q_put and self._notifier:
+ # new stuff on work queue, kick the the application...
+ self._work_q_put = False
+ _callback_thread = currentThread()
+ logging.info("Calling console notifier.indication")
+ self._notifier.indication()
+ _callback_thread = None
+
+ if self._operational:
+ # wait for a message to arrive or an agent
+ # to expire
+ now = datetime.datetime.utcnow()
+ if self._next_agent_expire > now:
+ timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds
+ try:
+ logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
+ xxx = self._session.next_receiver(timeout = timeout)
+ except Empty:
+ pass
+
+
+ logging.debug("Shutting down Console thread")
+
+ def get_objects(self,
+ _schema_id=None,
+ _pname=None, _cname=None,
+ _object_id=None,
+ _agents=None,
+ _timeout=None):
+ """
+ @todo
+ """
+ if _object_id is not None:
+ # query by object id
+ query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, _object_id)
+ elif _schema_id is not None:
+ pred = QmfQueryPredicate({QmfQuery.CMP_EQ:
+ [QmfData.KEY_SCHEMA_ID,
+ _schema_id.map_encode()]})
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred)
+ elif _pname is not None:
+ # query by package name (and maybe class name)
+ if _cname is not None:
+ pred = QmfQueryPredicate({QmfQuery.LOGIC_AND:
+ [{QmfQuery.CMP_EQ:
+ [SchemaClassId.KEY_PACKAGE,
+ _pname]},
+ {QmfQuery.CMP_EQ:
+ [SchemaClassId.KEY_CLASS,
+ _cname]}]})
+ else:
+ pred = QmfQueryPredicate({QmfQuery.CMP_EQ:
+ [SchemaClassId.KEY_PACKAGE,
+ _pname]})
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred)
+
+ else:
+ raise Exception("invalid arguments")
+
+ if _agents is None:
+ # use copy of current agent list
+ self._lock.acquire()
+ try:
+ agent_list = self._agent_map.values()
+ finally:
+ self._lock.release()
+ elif isinstance(_agents, Agent):
+ agent_list = [_agents]
+ else:
+ agent_list = _agents
+ # @todo validate this list!
+
+ # @todo: fix when async doQuery done - query all agents at once, then
+ # wait for replies, instead of per-agent querying....
+
+ if _timeout is None:
+ _timeout = self._reply_timeout
+
+ obj_list = []
+ expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout)
+ for agent in agent_list:
+ if not agent.isActive():
+ continue
+ now = datetime.datetime.utcnow()
+ if now >= expired:
+ break
+ timeout = ((expired - now) + datetime.timedelta(microseconds=999999)).seconds
+ reply = self.doQuery(agent, query, timeout)
+ if reply:
+ obj_list = obj_list + reply
+
+ if obj_list:
+ return obj_list
+ return None
+
+
+
+ # called by run() thread ONLY
+ #
+ def _dispatch(self, msg, _direct=True):
+ """
+ PRIVATE: Process a message received from an Agent
+ """
+ logging.debug( "Message received from Agent! [%s]" % msg )
+ try:
+ version,opcode = parseSubject(msg.subject)
+ # @todo: deal with version mismatch!!!
+ except:
+ logging.error("Ignoring unrecognized broadcast message '%s'" % msg.subject)
+ return
+
+ cmap = {}; props = {}
+ if msg.content_type == "amqp/map":
+ cmap = msg.content
+ if msg.properties:
+ props = msg.properties
+
+ if opcode == OpCode.agent_ind:
+ self._handleAgentIndMsg( msg, cmap, version, _direct )
+ elif opcode == OpCode.data_ind:
+ self._handleDataIndMsg(msg, cmap, version, _direct)
+ elif opcode == OpCode.event_ind:
+ self._handleEventIndMsg(msg, cmap, version, _direct)
+ elif opcode == OpCode.managed_object:
+ logging.warning("!!! managed_object TBD !!!")
+ elif opcode == OpCode.object_ind:
+ logging.warning("!!! object_ind TBD !!!")
+ elif opcode == OpCode.response:
+ self._handleResponseMsg(msg, cmap, version, _direct)
+ elif opcode == OpCode.schema_ind:
+ logging.warning("!!! schema_ind TBD !!!")
+ elif opcode == OpCode.noop:
+ logging.debug("No-op msg received.")
+ else:
+ logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode)
+
+
+ def _handleAgentIndMsg(self, msg, cmap, version, direct):
+ """
+ Process a received agent-ind message. This message may be a response to a
+ agent-locate, or it can be an unsolicited agent announce.
+ """
+ logging.debug("_handleAgentIndMsg '%s' (%s)" % (msg, time.time()))
+
+ ai_map = cmap.get(MsgKey.agent_info)
+ if not ai_map or not isinstance(ai_map, type({})):
+ logging.warning("Bad agent-ind message received: '%s'" % msg)
+ return
+ name = ai_map.get("_name")
+ if not name:
+ logging.warning("Bad agent-ind message received: agent name missing"
+ " '%s'" % msg)
+ return
+
+ ignore = True
+ matched = False
+ correlated = False
+ agent_query = self._agent_discovery_filter
+
+ if msg.correlation_id:
+ correlated = self._req_correlation.isValid(msg.correlation_id)
+
+ if direct and correlated:
+ ignore = False
+ elif agent_query:
+ matched = agent_query.evaluate(QmfData.create(values=ai_map))
+ ignore = not matched
+
+ if not ignore:
+ agent = None
+ self._lock.acquire()
+ try:
+ agent = self._agent_map.get(name)
+ finally:
+ self._lock.release()
+
+ if not agent:
+ # need to create and add a new agent
+ agent = self._createAgent(name)
+ if not agent:
+ return # failed to add agent
+
+ # lock out expiration scanning code
+ self._lock.acquire()
+ try:
+ old_timestamp = agent._announce_timestamp
+ agent._announce_timestamp = datetime.datetime.utcnow()
+ finally:
+ self._lock.release()
+
+ if old_timestamp == None and matched:
+ logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
+ wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
+ self._work_q.put(wi)
+ self._work_q_put = True
+
+ if correlated:
+ # wake up all waiters
+ logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ self._req_correlation.put_data(msg.correlation_id, msg)
+
+
+
+
+ def _handleDataIndMsg(self, msg, cmap, version, direct):
+ """
+ Process a received data-ind message.
+ """
+ logging.debug("_handleDataIndMsg '%s' (%s)" % (msg, time.time()))
+
+ if not self._req_correlation.isValid(msg.correlation_id):
+ logging.debug("Data indicate received with unknown correlation_id"
+ " msg='%s'" % str(msg))
+ return
+
+ # wake up all waiters
+ logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ self._req_correlation.put_data(msg.correlation_id, msg)
+
+
+ def _handleResponseMsg(self, msg, cmap, version, direct):
+ """
+ Process a received data-ind message.
+ """
+ # @todo code replication - clean me.
+ logging.debug("_handleResponseMsg '%s' (%s)" % (msg, time.time()))
+
+ if not self._req_correlation.isValid(msg.correlation_id):
+ logging.debug("Response msg received with unknown correlation_id"
+ " msg='%s'" % str(msg))
+ return
+
+ # wake up all waiters
+ logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ self._req_correlation.put_data(msg.correlation_id, msg)
+
+ def _handleEventIndMsg(self, msg, cmap, version, _direct):
+ ei_map = cmap.get(MsgKey.event)
+ if not ei_map or not isinstance(ei_map, type({})):
+ logging.warning("Bad event indication message received: '%s'" % msg)
+ return
+
+ aname = ei_map.get("_name")
+ emap = ei_map.get("_event")
+ if not aname:
+ logging.debug("No '_name' field in event indication message.")
+ return
+ if not emap:
+ logging.debug("No '_event' field in event indication message.")
+ return
+ # @todo: do I need to lock this???
+ agent = self._agent_map.get(aname)
+ if not agent:
+ logging.debug("Agent '%s' not known." % aname)
+ return
+ try:
+ # @todo: schema???
+ event = QmfEvent.from_map(emap)
+ except TypeError:
+ logging.debug("Invalid QmfEvent map received: %s" % str(emap))
+ return
+
+ # @todo: schema? Need to fetch it, but not from this thread!
+ # This thread can not pend on a request.
+ logging.debug("Publishing event received from agent %s" % aname)
+ wi = WorkItem(WorkItem.EVENT_RECEIVED, None,
+ {"agent":agent,
+ "event":event})
+ self._work_q.put(wi)
+ self._work_q_put = True
+
+
+ def _expireAgents(self):
+ """
+ Check for expired agents and issue notifications when they expire.
+ """
+ now = datetime.datetime.utcnow()
+ if self._next_agent_expire and now < self._next_agent_expire:
+ return
+ lifetime_delta = datetime.timedelta(seconds = self._agent_timeout)
+ next_expire_delta = lifetime_delta
+ self._lock.acquire()
+ try:
+ logging.debug("!!! expiring agents '%s'" % now)
+ for agent in self._agent_map.itervalues():
+ if agent._announce_timestamp:
+ agent_deathtime = agent._announce_timestamp + lifetime_delta
+ if agent_deathtime <= now:
+ logging.debug("AGENT_DELETED for %s" % agent)
+ agent._announce_timestamp = None
+ wi = WorkItem(WorkItem.AGENT_DELETED, None,
+ {"agent":agent})
+ # @todo: remove agent from self._agent_map
+ self._work_q.put(wi)
+ self._work_q_put = True
+ else:
+ if (agent_deathtime - now) < next_expire_delta:
+ next_expire_delta = agent_deathtime - now
+
+ self._next_agent_expire = now + next_expire_delta
+ logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire)
+ finally:
+ self._lock.release()
+
+
+
+ def _createAgent( self, name ):
+ """
+ Factory to create/retrieve an agent for this console
+ """
+ logging.debug("creating agent %s" % name)
+ self._lock.acquire()
+ try:
+ agent = self._agent_map.get(name)
+ if agent:
+ return agent
+
+ agent = Agent(name, self)
+ try:
+ agent._sender = self._session.sender(str(agent._address) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}")
+ except:
+ logging.warning("Unable to create sender for %s" % name)
+ return None
+ logging.debug("created agent sender %s" % agent._sender.target)
+
+ events_addr = QmfAddress.topic(name, self._domain)
+ try:
+ agent._event_recvr = self._session.receiver(str(events_addr) +
+ ";{create:always,"
+ " node-properties:{type:topic}}",
+ capacity=1)
+ except:
+ logging.warning("Unable to create event receiver for %s" % name)
+ return None
+ logging.debug("created agent event receiver %s" % agent._event_recvr.source)
+
+ self._agent_map[name] = agent
+ finally:
+ self._lock.release()
+
+ # new agent - query for its schema database for
+ # seeding the schema cache (@todo)
+ # query = QmfQuery({QmfQuery.TARGET_SCHEMA_ID:None})
+ # agent._sendQuery( query )
+
+ return agent
+
+
+
+ def enable_agent_discovery(self, _query=None):
+ """
+ Called to enable the asynchronous Agent Discovery process.
+ Once enabled, AGENT_ADD work items can arrive on the WorkQueue.
+ """
+ # @todo: fix - take predicate only, not entire query!
+ if _query is not None:
+ if (not isinstance(_query, QmfQuery) or
+ _query.get_target() != QmfQuery.TARGET_AGENT):
+ raise TypeError("Type QmfQuery with target == TARGET_AGENT expected")
+ self._agent_discovery_filter = _query
+ else:
+ # create a match-all agent query (no predicate)
+ self._agent_discovery_filter = QmfQuery.create_wildcard(QmfQuery.TARGET_AGENT)
+
+ def disable_agent_discovery(self):
+ """
+ Called to disable the async Agent Discovery process enabled by
+ calling enableAgentDiscovery()
+ """
+ self._agent_discovery_filter = None
+
+
+
+ def get_workitem_count(self):
+ """
+ Returns the count of pending WorkItems that can be retrieved.
+ """
+ return self._work_q.qsize()
+
+
+
+ def get_next_workitem(self, timeout=None):
+ """
+ Returns the next pending work item, or None if none available.
+ @todo: subclass and return an Empty event instead.
+ """
+ try:
+ wi = self._work_q.get(True, timeout)
+ except Queue.Empty:
+ return None
+ return wi
+
+
+ def release_workitem(self, wi):
+ """
+ Return a WorkItem to the Console when it is no longer needed.
+ @todo: call Queue.task_done() - only 2.5+
+
+ @type wi: class qmfConsole.WorkItem
+ @param wi: work item object to return.
+ """
+ pass
+
+ def _add_schema(self, schema):
+ """
+ @todo
+ """
+ if not isinstance(schema, SchemaClass):
+ raise TypeError("SchemaClass type expected")
+
+ self._lock.acquire()
+ try:
+ sid = schema.get_class_id()
+ if not self._schema_cache.has_key(sid):
+ self._schema_cache[sid] = schema
+ finally:
+ self._lock.release()
+
+ def _fetch_schema(self, schema_id, _agent=None, _timeout=None):
+ """
+ Find the schema identified by schema_id. If not in the cache, ask the
+ agent for it.
+ """
+ if not isinstance(schema_id, SchemaClassId):
+ raise TypeError("SchemaClassId type expected")
+
+ self._lock.acquire()
+ try:
+ schema = self._schema_cache.get(schema_id)
+ if schema:
+ return schema
+ finally:
+ self._lock.release()
+
+ if _agent is None:
+ return None
+
+ # note: doQuery will add the new schema to the cache automatically.
+ slist = self.doQuery(_agent,
+ QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id),
+ _timeout)
+ if slist:
+ return slist[0]
+ else:
+ return None
+
+
+
+ # def get_packages(self):
+ # plist = []
+ # for i in range(self.impl.packageCount()):
+ # plist.append(self.impl.getPackageName(i))
+ # return plist
+
+
+ # def get_classes(self, package, kind=CLASS_OBJECT):
+ # clist = []
+ # for i in range(self.impl.classCount(package)):
+ # key = self.impl.getClass(package, i)
+ # class_kind = self.impl.getClassKind(key)
+ # if class_kind == kind:
+ # if kind == CLASS_OBJECT:
+ # clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)}))
+ # elif kind == CLASS_EVENT:
+ # clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)}))
+ # return clist
+
+
+ # def bind_package(self, package):
+ # return self.impl.bindPackage(package)
+
+
+ # def bind_class(self, kwargs = {}):
+ # if "key" in kwargs:
+ # self.impl.bindClass(kwargs["key"])
+ # elif "package" in kwargs:
+ # package = kwargs["package"]
+ # if "class" in kwargs:
+ # self.impl.bindClass(package, kwargs["class"])
+ # else:
+ # self.impl.bindClass(package)
+ # else:
+ # raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']")
+
+
+ # def get_agents(self, broker=None):
+ # blist = []
+ # if broker:
+ # blist.append(broker)
+ # else:
+ # self._cv.acquire()
+ # try:
+ # # copy while holding lock
+ # blist = self._broker_list[:]
+ # finally:
+ # self._cv.release()
+
+ # agents = []
+ # for b in blist:
+ # for idx in range(b.impl.agentCount()):
+ # agents.append(AgentProxy(b.impl.getAgent(idx), b))
+
+ # return agents
+
+
+ # def get_objects(self, query, kwargs = {}):
+ # timeout = 30
+ # agent = None
+ # temp_args = kwargs.copy()
+ # if type(query) == type({}):
+ # temp_args.update(query)
+
+ # if "_timeout" in temp_args:
+ # timeout = temp_args["_timeout"]
+ # temp_args.pop("_timeout")
+
+ # if "_agent" in temp_args:
+ # agent = temp_args["_agent"]
+ # temp_args.pop("_agent")
+
+ # if type(query) == type({}):
+ # query = Query(temp_args)
+
+ # self._select = {}
+ # for k in temp_args.iterkeys():
+ # if type(k) == str:
+ # self._select[k] = temp_args[k]
+
+ # self._cv.acquire()
+ # try:
+ # self._sync_count = 1
+ # self._sync_result = []
+ # broker = self._broker_list[0]
+ # broker.send_query(query.impl, None, agent)
+ # self._cv.wait(timeout)
+ # if self._sync_count == 1:
+ # raise Exception("Timed out: waiting for query response")
+ # finally:
+ # self._cv.release()
+
+ # return self._sync_result
+
+
+ # def get_object(self, query, kwargs = {}):
+ # '''
+ # Return one and only one object or None.
+ # '''
+ # objs = objects(query, kwargs)
+ # if len(objs) == 1:
+ # return objs[0]
+ # else:
+ # return None
+
+
+ # def first_object(self, query, kwargs = {}):
+ # '''
+ # Return the first of potentially many objects.
+ # '''
+ # objs = objects(query, kwargs)
+ # if objs:
+ # return objs[0]
+ # else:
+ # return None
+
+
+ # # Check the object against select to check for a match
+ # def _select_match(self, object):
+ # schema_props = object.properties()
+ # for key in self._select.iterkeys():
+ # for prop in schema_props:
+ # if key == p[0].name() and self._select[key] != p[1]:
+ # return False
+ # return True
+
+
+ # def _get_result(self, list, context):
+ # '''
+ # Called by Broker proxy to return the result of a query.
+ # '''
+ # self._cv.acquire()
+ # try:
+ # for item in list:
+ # if self._select_match(item):
+ # self._sync_result.append(item)
+ # self._sync_count -= 1
+ # self._cv.notify()
+ # finally:
+ # self._cv.release()
+
+
+ # def start_sync(self, query): pass
+
+
+ # def touch_sync(self, sync): pass
+
+
+ # def end_sync(self, sync): pass
+
+
+
+
+# def start_console_events(self):
+# self._cb_cond.acquire()
+# try:
+# self._cb_cond.notify()
+# finally:
+# self._cb_cond.release()
+
+
+# def _do_console_events(self):
+# '''
+# Called by the Console thread to poll for events. Passes the events
+# onto the ConsoleHandler associated with this Console. Is called
+# periodically, but can also be kicked by Console.start_console_events().
+# '''
+# count = 0
+# valid = self.impl.getEvent(self._event)
+# while valid:
+# count += 1
+# try:
+# if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
+# logging.debug("Console Event AGENT_ADDED received")
+# if self._handler:
+# self._handler.agent_added(AgentProxy(self._event.agent, None))
+# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
+# logging.debug("Console Event AGENT_DELETED received")
+# if self._handler:
+# self._handler.agent_deleted(AgentProxy(self._event.agent, None))
+# elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
+# logging.debug("Console Event NEW_PACKAGE received")
+# if self._handler:
+# self._handler.new_package(self._event.name)
+# elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
+# logging.debug("Console Event NEW_CLASS received")
+# if self._handler:
+# self._handler.new_class(SchemaClassKey(self._event.classKey))
+# elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
+# logging.debug("Console Event OBJECT_UPDATE received")
+# if self._handler:
+# self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
+# self._event.hasProps, self._event.hasStats)
+# elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
+# logging.debug("Console Event EVENT_RECEIVED received")
+# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
+# logging.debug("Console Event AGENT_HEARTBEAT received")
+# if self._handler:
+# self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
+# elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
+# logging.debug("Console Event METHOD_RESPONSE received")
+# else:
+# logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
+# except e:
+# print "Exception caught in callback thread:", e
+# self.impl.popEvent()
+# valid = self.impl.getEvent(self._event)
+# return count
+
+
+
+
+
+# class Broker(ConnectionHandler):
+# # attr_reader :impl :conn, :console, :broker_bank
+# def __init__(self, console, conn):
+# self.broker_bank = 1
+# self.console = console
+# self.conn = conn
+# self._session = None
+# self._cv = Condition()
+# self._stable = None
+# self._event = qmfengine.BrokerEvent()
+# self._xmtMessage = qmfengine.Message()
+# self.impl = qmfengine.BrokerProxy(self.console.impl)
+# self.console.impl.addConnection(self.impl, self)
+# self.conn.add_conn_handler(self)
+# self._operational = True
+
+
+# def shutdown(self):
+# logging.debug("broker.shutdown() called.")
+# self.console.impl.delConnection(self.impl)
+# self.conn.del_conn_handler(self)
+# if self._session:
+# self.impl.sessionClosed()
+# logging.debug("broker.shutdown() sessionClosed done.")
+# self._session.destroy()
+# logging.debug("broker.shutdown() session destroy done.")
+# self._session = None
+# self._operational = False
+# logging.debug("broker.shutdown() done.")
+
+
+# def wait_for_stable(self, timeout = None):
+# self._cv.acquire()
+# try:
+# if self._stable:
+# return
+# if timeout:
+# self._cv.wait(timeout)
+# if not self._stable:
+# raise Exception("Timed out: waiting for broker connection to become stable")
+# else:
+# while not self._stable:
+# self._cv.wait()
+# finally:
+# self._cv.release()
+
+
+# def send_query(self, query, ctx, agent):
+# agent_impl = None
+# if agent:
+# agent_impl = agent.impl
+# self.impl.sendQuery(query, ctx, agent_impl)
+# self.conn.kick()
+
+
+# def _do_broker_events(self):
+# count = 0
+# valid = self.impl.getEvent(self._event)
+# while valid:
+# count += 1
+# if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO:
+# logging.debug("Broker Event BROKER_INFO received");
+# elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE:
+# logging.debug("Broker Event DECLARE_QUEUE received");
+# self.conn.impl.declareQueue(self._session.handle, self._event.name)
+# elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE:
+# logging.debug("Broker Event DELETE_QUEUE received");
+# self.conn.impl.deleteQueue(self._session.handle, self._event.name)
+# elif self._event.kind == qmfengine.BrokerEvent.BIND:
+# logging.debug("Broker Event BIND received");
+# self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
+# elif self._event.kind == qmfengine.BrokerEvent.UNBIND:
+# logging.debug("Broker Event UNBIND received");
+# self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
+# elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE:
+# logging.debug("Broker Event SETUP_COMPLETE received");
+# self.impl.startProtocol()
+# elif self._event.kind == qmfengine.BrokerEvent.STABLE:
+# logging.debug("Broker Event STABLE received");
+# self._cv.acquire()
+# try:
+# self._stable = True
+# self._cv.notify()
+# finally:
+# self._cv.release()
+# elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE:
+# result = []
+# for idx in range(self._event.queryResponse.getObjectCount()):
+# result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self}))
+# self.console._get_result(result, self._event.context)
+# elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE:
+# obj = self._event.context
+# obj._method_result(MethodResponse(self._event.methodResponse()))
+
+# self.impl.popEvent()
+# valid = self.impl.getEvent(self._event)
+
+# return count
+
+
+# def _do_broker_messages(self):
+# count = 0
+# valid = self.impl.getXmtMessage(self._xmtMessage)
+# while valid:
+# count += 1
+# logging.debug("Broker: sending msg on connection")
+# self.conn.impl.sendMessage(self._session.handle, self._xmtMessage)
+# self.impl.popXmt()
+# valid = self.impl.getXmtMessage(self._xmtMessage)
+
+# return count
+
+
+# def _do_events(self):
+# while True:
+# self.console.start_console_events()
+# bcnt = self._do_broker_events()
+# mcnt = self._do_broker_messages()
+# if bcnt == 0 and mcnt == 0:
+# break;
+
+
+# def conn_event_connected(self):
+# logging.debug("Broker: Connection event CONNECTED")
+# self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
+# self.impl.sessionOpened(self._session.handle)
+# self._do_events()
+
+
+# def conn_event_disconnected(self, error):
+# logging.debug("Broker: Connection event DISCONNECTED")
+# pass
+
+
+# def conn_event_visit(self):
+# self._do_events()
+
+
+# def sess_event_session_closed(self, context, error):
+# logging.debug("Broker: Session event CLOSED")
+# self.impl.sessionClosed()
+
+
+# def sess_event_recv(self, context, message):
+# logging.debug("Broker: Session event MSG_RECV")
+# if not self._operational:
+# logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
+# self.impl.handleRcvMessage(message)
+# self._do_events()
+
+
+
+################################################################################
+################################################################################
+################################################################################
+################################################################################
+# TEMPORARY TEST CODE - TO BE DELETED
+################################################################################
+################################################################################
+################################################################################
+################################################################################
+
+if __name__ == '__main__':
+ # temp test code
+ from common import (qmfTypes, QmfEvent, SchemaProperty)
+
+ logging.getLogger().setLevel(logging.INFO)
+
+ logging.info( "************* Creating Async Console **************" )
+
+ class MyNotifier(Notifier):
+ def __init__(self, context):
+ self._myContext = context
+ self.WorkAvailable = False
+
+ def indication(self):
+ print("Indication received! context=%d" % self._myContext)
+ self.WorkAvailable = True
+
+ _noteMe = MyNotifier( 666 )
+
+ _myConsole = Console(notifier=_noteMe)
+
+ _myConsole.enable_agent_discovery()
+ logging.info("Waiting...")
+
+
+ logging.info( "Destroying console:" )
+ _myConsole.destroy( 10 )
+
+ logging.info( "******** Messing around with Schema ********" )
+
+ _sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass",
+ stype=SchemaClassId.TYPE_EVENT),
+ _desc="A typical event schema",
+ _props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8,
+ kwargs = {"min":0,
+ "max":100,
+ "unit":"seconds",
+ "desc":"sleep value"}),
+ "Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR,
+ kwargs={"maxlen":100,
+ "desc":"a string argument"})})
+ print("_sec=%s" % _sec.get_class_id())
+ print("_sec.gePropertyCount()=%d" % _sec.get_property_count() )
+ print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') )
+ print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') )
+ try:
+ print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') )
+ except:
+ pass
+ print("_sec.getProperties()='%s'" % _sec.get_properties())
+
+ print("Adding another argument")
+ _arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL,
+ kwargs={"dir":"IO",
+ "desc":"a boolean argument"})
+ _sec.add_property('Argument-3', _arg3)
+ print("_sec=%s" % _sec.get_class_id())
+ print("_sec.getPropertyCount()=%d" % _sec.get_property_count() )
+ print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') )
+ print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') )
+ print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') )
+
+ print("_arg3.mapEncode()='%s'" % _arg3.map_encode() )
+
+ _secmap = _sec.map_encode()
+ print("_sec.mapEncode()='%s'" % _secmap )
+
+ _sec2 = SchemaEventClass( _map=_secmap )
+
+ print("_sec=%s" % _sec.get_class_id())
+ print("_sec2=%s" % _sec2.get_class_id())
+
+ _soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage",
+ "_class_name": "myOtherClass",
+ "_type": "_data"},
+ "_desc": "A test data object",
+ "_values":
+ {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8,
+ "access": "RO",
+ "index": True,
+ "unit": "degrees"},
+ "prop2": {"amqp_type": qmfTypes.TYPE_UINT8,
+ "access": "RW",
+ "index": True,
+ "desc": "The Second Property(tm)",
+ "unit": "radians"},
+ "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME,
+ "unit": "seconds",
+ "desc": "time until I retire"},
+ "meth1": {"_desc": "A test method",
+ "_arguments":
+ {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
+ "desc": "an argument 1",
+ "dir": "I"},
+ "arg2": {"amqp_type": qmfTypes.TYPE_BOOL,
+ "dir": "IO",
+ "desc": "some weird boolean"}}},
+ "meth2": {"_desc": "A test method",
+ "_arguments":
+ {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
+ "desc": "an 'nuther argument",
+ "dir":
+ "I"}}}},
+ "_subtypes":
+ {"prop1":"qmfProperty",
+ "prop2":"qmfProperty",
+ "statistics":"qmfProperty",
+ "meth1":"qmfMethod",
+ "meth2":"qmfMethod"},
+ "_primary_key_names": ["prop2", "prop1"]})
+
+ print("_soc='%s'" % _soc)
+
+ print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names())
+
+ print("_soc.getPropertyCount='%d'" % _soc.get_property_count())
+ print("_soc.getProperties='%s'" % _soc.get_properties())
+ print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2'))
+
+ print("_soc.getMethodCount='%d'" % _soc.get_method_count())
+ print("_soc.getMethods='%s'" % _soc.get_methods())
+ print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2'))
+
+ _socmap = _soc.map_encode()
+ print("_socmap='%s'" % _socmap)
+ _soc2 = SchemaObjectClass( _map=_socmap )
+ print("_soc='%s'" % _soc)
+ print("_soc2='%s'" % _soc2)
+
+ if _soc2.get_class_id() == _soc.get_class_id():
+ print("soc and soc2 are the same schema")
+
+
+ logging.info( "******** Messing around with ObjectIds ********" )
+
+
+ qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} )
+ print("qd='%s':" % qd)
+
+ print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4))
+
+ print("qd map='%s'" % qd.map_encode())
+ print("qd getProperty('prop4')='%s'" % qd.get_value("prop4"))
+ qd.set_value("prop4", 4, "A test property called 4")
+ print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4"))
+ qd.prop4 = 9
+ print("qd.prop4 = 9 ='%s'" % qd.prop4)
+ qd["prop4"] = 11
+ print("qd[prop4] = 11 ='%s'" % qd["prop4"])
+
+ print("qd.mapEncode()='%s'" % qd.map_encode())
+ _qd2 = QmfData( _map = qd.map_encode() )
+ print("_qd2.mapEncode()='%s'" % _qd2.map_encode())
+
+ _qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666,
+ "prop2": 0}},
+ agent="some agent name?",
+ _schema = _soc)
+
+ print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode())
+
+ _qmfDesc1._set_schema( _soc )
+
+ print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2"))
+ print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id())
+ print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id())
+
+
+ _qmfDescMap = _qmfDesc1.map_encode()
+ print("_qmfDescMap='%s'" % _qmfDescMap)
+
+ _qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc )
+
+ print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode())
+ print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2"))
+ print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id())
+
+
+ logging.info( "******** Messing around with QmfEvents ********" )
+
+
+ _qmfevent1 = QmfEvent( _timestamp = 1111,
+ _schema = _sec,
+ _values = {"Argument-1": 77,
+ "Argument-3": True,
+ "Argument-2": "a string"})
+ print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode())
+ print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp())
+
+ _qmfevent1Map = _qmfevent1.map_encode()
+
+ _qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec)
+ print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode())
+
+
+ logging.info( "******** Messing around with Queries ********" )
+
+ _q1 = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT,
+ QmfQueryPredicate({QmfQuery.LOGIC_AND:
+ [{QmfQuery.CMP_EQ: ["vendor", "AVendor"]},
+ {QmfQuery.CMP_EQ: ["product", "SomeProduct"]},
+ {QmfQuery.CMP_EQ: ["name", "Thingy"]},
+ {QmfQuery.LOGIC_OR:
+ [{QmfQuery.CMP_LE: ["temperature", -10]},
+ {QmfQuery.CMP_FALSE: None},
+ {QmfQuery.CMP_EXISTS: ["namey"]}]}]}))
+
+ print("_q1.mapEncode() = [%s]" % _q1.map_encode())
diff --git a/qpid/python/qmf2/tests/__init__.py b/qpid/python/qmf2/tests/__init__.py
new file mode 100644
index 0000000000..2e742b79be
--- /dev/null
+++ b/qpid/python/qmf2/tests/__init__.py
@@ -0,0 +1,22 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import agent_discovery, basic_query, basic_method, obj_gets, events
diff --git a/qpid/python/qmf2/tests/agent_discovery.py b/qpid/python/qmf2/tests/agent_discovery.py
new file mode 100644
index 0000000000..19ed79cbc2
--- /dev/null
+++ b/qpid/python/qmf2/tests/agent_discovery.py
@@ -0,0 +1,320 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+import qmf2.common
+import qmf2.console
+import qmf2.agent
+
+
+class _testNotifier(qmf.qmfCommon.Notifier):
+ def __init__(self):
+ self._event = Event()
+
+ def indication(self):
+ # note: called by qmf daemon thread
+ self._event.set()
+
+ def wait_for_work(self, timeout):
+ # note: called by application thread to wait
+ # for qmf to generate work
+ self._event.wait(timeout)
+ timed_out = self._event.isSet() == False
+ if not timed_out:
+ self._event.clear()
+ return True
+ return False
+
+
+class _agentApp(Thread):
+ def __init__(self, name, heartbeat):
+ Thread.__init__(self)
+ self.notifier = _testNotifier()
+ self.agent = qmf.qmfAgent.Agent(name,
+ _notifier=self.notifier,
+ _heartbeat_interval=heartbeat)
+ # No database needed for this test
+ self.running = True
+ self.start()
+
+ def connect_agent(self, broker_url):
+ # broker_url = "user/passwd@hostname:port"
+ self.conn = qpid.messaging.Connection(broker_url.host,
+ broker_url.port,
+ broker_url.user,
+ broker_url.password)
+ self.conn.connect()
+ self.agent.set_connection(self.conn)
+
+ def disconnect_agent(self, timeout):
+ if self.conn:
+ self.agent.remove_connection(timeout)
+
+ def shutdown_agent(self, timeout):
+ self.agent.destroy(timeout)
+
+ def stop(self):
+ self.running = False
+ self.notifier.indication() # hmmm... collide with daemon???
+ self.join(10)
+ if self.isAlive():
+ logging.error("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+ def run(self):
+ while self.running:
+ self.notifier.wait_for_work(None)
+ wi = self.agent.get_next_workitem(timeout=0)
+ while wi is not None:
+ logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+ self.agent.release_workitem(wi)
+ wi = self.agent.get_next_workitem(timeout=0)
+
+
+
+class BaseTest(unittest.TestCase):
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
+ def setUp(self):
+ # one second agent indication interval
+ self.agent1 = _agentApp("agent1", 1)
+ self.agent1.connect_agent(self.broker)
+ self.agent2 = _agentApp("agent2", 1)
+ self.agent2.connect_agent(self.broker)
+
+ def tearDown(self):
+ if self.agent1:
+ self.agent1.shutdown_agent(10)
+ self.agent1.stop()
+ self.agent1 = None
+ if self.agent2:
+ self.agent2.shutdown_agent(10)
+ self.agent2.stop()
+ self.agent2 = None
+
+ def test_discover_all(self):
+ # create console
+ # enable agent discovery
+ # wait
+ # expect agent add for agent1 and agent2
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+ self.console.enable_agent_discovery()
+
+ agent1_found = agent2_found = False
+ wi = self.console.get_next_workitem(timeout=3)
+ while wi and not (agent1_found and agent2_found):
+ if wi.get_type() == wi.AGENT_ADDED:
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent1":
+ agent1_found = True
+ elif agent.get_name() == "agent2":
+ agent2_found = True
+ else:
+ self.fail("Unexpected agent name received: %s" %
+ agent.get_name())
+ if agent1_found and agent2_found:
+ break;
+
+ wi = self.console.get_next_workitem(timeout=3)
+
+ self.assertTrue(agent1_found and agent2_found, "All agents not discovered")
+
+ self.console.destroy(10)
+
+
+ def test_discover_one(self):
+ # create console
+ # enable agent discovery, filter for agent1 only
+ # wait until timeout
+ # expect agent add for agent1 only
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ query = qmf.qmfCommon.QmfQuery.create_predicate(
+ qmf.qmfCommon.QmfQuery.TARGET_AGENT,
+ qmf.qmfCommon.QmfQueryPredicate({qmf.qmfCommon.QmfQuery.CMP_EQ:
+ [qmf.qmfCommon.QmfQuery.KEY_AGENT_NAME, "agent1"]}))
+ self.console.enable_agent_discovery(query)
+
+ agent1_found = agent2_found = False
+ wi = self.console.get_next_workitem(timeout=3)
+ while wi:
+ if wi.get_type() == wi.AGENT_ADDED:
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent1":
+ agent1_found = True
+ elif agent.get_name() == "agent2":
+ agent2_found = True
+ else:
+ self.fail("Unexpected agent name received: %s" %
+ agent.get_name())
+
+ wi = self.console.get_next_workitem(timeout=2)
+
+ self.assertTrue(agent1_found and not agent2_found, "Unexpected agent discovered")
+
+ self.console.destroy(10)
+
+
+ def test_heartbeat(self):
+ # create console with 2 sec agent timeout
+ # enable agent discovery, find all agents
+ # stop agent1, expect timeout notification
+ # stop agent2, expect timeout notification
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=2)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+ self.console.enable_agent_discovery()
+
+ agent1_found = agent2_found = False
+ wi = self.console.get_next_workitem(timeout=4)
+ while wi and not (agent1_found and agent2_found):
+ if wi.get_type() == wi.AGENT_ADDED:
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent1":
+ agent1_found = True
+ elif agent.get_name() == "agent2":
+ agent2_found = True
+ else:
+ self.fail("Unexpected agent name received: %s" %
+ agent.get_name())
+ if agent1_found and agent2_found:
+ break;
+
+ wi = self.console.get_next_workitem(timeout=4)
+
+ self.assertTrue(agent1_found and agent2_found, "All agents not discovered")
+
+ # now kill agent1 and wait for expiration
+
+ agent1 = self.agent1
+ self.agent1 = None
+ agent1.shutdown_agent(10)
+ agent1.stop()
+
+ wi = self.console.get_next_workitem(timeout=4)
+ while wi is not None:
+ if wi.get_type() == wi.AGENT_DELETED:
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent1":
+ agent1_found = False
+ else:
+ self.fail("Unexpected agent_deleted received: %s" %
+ agent.get_name())
+ if not agent1_found:
+ break;
+
+ wi = self.console.get_next_workitem(timeout=4)
+
+ self.assertFalse(agent1_found, "agent1 did not delete!")
+
+ # now kill agent2 and wait for expiration
+
+ agent2 = self.agent2
+ self.agent2 = None
+ agent2.shutdown_agent(10)
+ agent2.stop()
+
+ wi = self.console.get_next_workitem(timeout=4)
+ while wi is not None:
+ if wi.get_type() == wi.AGENT_DELETED:
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent2":
+ agent2_found = False
+ else:
+ self.fail("Unexpected agent_deleted received: %s" %
+ agent.get_name())
+ if not agent2_found:
+ break;
+
+ wi = self.console.get_next_workitem(timeout=4)
+
+ self.assertFalse(agent2_found, "agent2 did not delete!")
+
+ self.console.destroy(10)
+
+
+ def test_find_agent(self):
+ # create console
+ # do not enable agent discovery
+ # find agent1, expect success
+ # find agent-none, expect failure
+ # find agent2, expect success
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ agent1 = self.console.find_agent("agent1", timeout=3)
+ self.assertTrue(agent1 and agent1.get_name() == "agent1")
+
+ no_agent = self.console.find_agent("agent-none", timeout=3)
+ self.assertTrue(no_agent == None)
+
+ agent2 = self.console.find_agent("agent2", timeout=3)
+ self.assertTrue(agent2 and agent2.get_name() == "agent2")
+
+ self.console.removeConnection(self.conn, 10)
+ self.console.destroy(10)
+
+
diff --git a/qpid/python/qmf2/tests/agent_test.py b/qpid/python/qmf2/tests/agent_test.py
new file mode 100644
index 0000000000..14d8ada197
--- /dev/null
+++ b/qpid/python/qmf2/tests/agent_test.py
@@ -0,0 +1,167 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import logging
+import time
+import unittest
+from threading import Semaphore
+
+
+from qpid.messaging import *
+from qmf2.common import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData,
+ QmfEvent, SchemaMethod, Notifier, SchemaClassId,
+ WorkItem)
+from qmf2.agent import (Agent, QmfAgentData)
+
+
+
+class ExampleNotifier(Notifier):
+ def __init__(self):
+ self._sema4 = Semaphore(0) # locked
+
+ def indication(self):
+ self._sema4.release()
+
+ def waitForWork(self):
+ print("Waiting for event...")
+ self._sema4.acquire()
+ print("...event present")
+
+
+
+
+class QmfTest(unittest.TestCase):
+ def test_begin(self):
+ print("!!! being test")
+
+ def test_end(self):
+ print("!!! end test")
+
+
+#
+# An example agent application
+#
+
+
+if __name__ == '__main__':
+ _notifier = ExampleNotifier()
+ _agent = Agent( "qmf.testAgent", _notifier=_notifier )
+
+ # Dynamically construct a class schema
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+ _desc="A test data schema",
+ _object_id_names=["index1", "index2"] )
+ # add properties
+ _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ # these two properties are statistics
+ _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # These two properties can be set via the method call
+ _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+
+ # add method
+ _meth = SchemaMethod( _desc="Method to set string and int in object." )
+ _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+ _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+ _schema.add_method( "set_meth", _meth )
+
+ # Add schema to Agent
+
+ _agent.register_object_class(_schema)
+
+ # instantiate managed data objects matching the schema
+
+ _obj1 = QmfAgentData( _agent, _schema=_schema )
+ _obj1.set_value("index1", 100)
+ _obj1.set_value("index2", "a name" )
+ _obj1.set_value("set_string", "UNSET")
+ _obj1.set_value("set_int", 0)
+ _obj1.set_value("query_count", 0)
+ _obj1.set_value("method_call_count", 0)
+ _agent.add_object( _obj1 )
+
+ _agent.add_object( QmfAgentData( _agent, _schema=_schema,
+ _values={"index1":99,
+ "index2": "another name",
+ "set_string": "UNSET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0} ))
+
+ # add an "unstructured" object to the Agent
+ _obj2 = QmfAgentData(_agent, _object_id="01545")
+ _obj2.set_value("field1", "a value")
+ _obj2.set_value("field2", 2)
+ _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj2.set_value("field4", ["a", "list", "value"])
+ _agent.add_object(_obj2)
+
+
+ ## Now connect to the broker
+
+ _c = Connection("localhost")
+ _c.connect()
+ _agent.setConnection(_c)
+
+ _error_data = QmfData.create({"code": -1, "description": "You made a boo-boo."})
+
+ _done = False
+ while not _done:
+ # try:
+ _notifier.waitForWork()
+
+ _wi = _agent.get_next_workitem(timeout=0)
+ while _wi:
+
+ if _wi.get_type() == WorkItem.METHOD_CALL:
+ mc = _wi.get_params()
+
+ if mc.get_name() == "set_meth":
+ print("!!! Calling 'set_meth' on Object_id = %s" % mc.get_object_id())
+ print("!!! args='%s'" % str(mc.get_args()))
+ print("!!! userid=%s" % str(mc.get_user_id()))
+ print("!!! handle=%s" % _wi.get_handle())
+ _agent.method_response(_wi.get_handle(),
+ {"rc1": 100, "rc2": "Success"})
+ else:
+ print("!!! Unknown Method name = %s" % mc.get_name())
+ _agent.method_response(_wi.get_handle(), _error=_error_data)
+ else:
+ print("TBD: work item %d:%s" % (_wi.get_type(), str(_wi.get_params())))
+
+ _agent.release_workitem(_wi)
+ _wi = _agent.get_next_workitem(timeout=0)
+ # except:
+ # print( "shutting down...")
+ # _done = True
+
+ print( "Removing connection... TBD!!!" )
+ #_myConsole.remove_connection( _c, 10 )
+
+ print( "Destroying agent... TBD!!!" )
+ #_myConsole.destroy( 10 )
+
+ print( "******** agent test done ********" )
+
+
+
diff --git a/qpid/python/qmf2/tests/basic_method.py b/qpid/python/qmf2/tests/basic_method.py
new file mode 100644
index 0000000000..c5098b5d72
--- /dev/null
+++ b/qpid/python/qmf2/tests/basic_method.py
@@ -0,0 +1,348 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+ SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+ QmfData, QmfQueryPredicate, WorkItem)
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent, MethodCallParams)
+
+
+class _testNotifier(Notifier):
+ def __init__(self):
+ self._event = Event()
+
+ def indication(self):
+ # note: called by qmf daemon thread
+ self._event.set()
+
+ def wait_for_work(self, timeout):
+ # note: called by application thread to wait
+ # for qmf to generate work
+ self._event.wait(timeout)
+ timed_out = self._event.isSet() == False
+ if not timed_out:
+ self._event.clear()
+ return True
+ return False
+
+
+class _agentApp(Thread):
+ def __init__(self, name, heartbeat):
+ Thread.__init__(self)
+ self.notifier = _testNotifier()
+ self.agent = Agent(name,
+ _notifier=self.notifier,
+ _heartbeat_interval=heartbeat)
+
+ # Dynamically construct a management database
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+ _desc="A test data schema",
+ _object_id_names=["index1", "index2"] )
+ # add properties
+ _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ # these two properties are statistics
+ _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # These two properties can be set via the method call
+ _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # add method
+ _meth = SchemaMethod( _desc="Method to set string and int in object." )
+ _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+ _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+ _schema.add_method( "set_meth", _meth )
+
+ # Add schema to Agent
+
+ self.agent.register_object_class(_schema)
+
+ # instantiate managed data objects matching the schema
+
+ _obj1 = QmfAgentData( self.agent, _schema=_schema )
+ _obj1.set_value("index1", 100)
+ _obj1.set_value("index2", "a name" )
+ _obj1.set_value("set_string", "UNSET")
+ _obj1.set_value("set_int", 0)
+ _obj1.set_value("query_count", 0)
+ _obj1.set_value("method_call_count", 0)
+ self.agent.add_object( _obj1 )
+
+ self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+ _values={"index1":99,
+ "index2": "another name",
+ "set_string": "UNSET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0} ))
+
+ # add an "unstructured" object to the Agent
+ _obj2 = QmfAgentData(self.agent, _object_id="01545")
+ _obj2.set_value("field1", "a value")
+ _obj2.set_value("field2", 2)
+ _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj2.set_value("field4", ["a", "list", "value"])
+ self.agent.add_object(_obj2)
+
+ self.running = True
+ self.start()
+
+ def connect_agent(self, broker_url):
+ # broker_url = "user/passwd@hostname:port"
+ self.conn = qpid.messaging.Connection(broker_url.host,
+ broker_url.port,
+ broker_url.user,
+ broker_url.password)
+ self.conn.connect()
+ self.agent.set_connection(self.conn)
+
+ def disconnect_agent(self, timeout):
+ if self.conn:
+ self.agent.remove_connection(timeout)
+
+ def shutdown_agent(self, timeout):
+ self.agent.destroy(timeout)
+
+ def stop(self):
+ self.running = False
+ self.notifier.indication() # hmmm... collide with daemon???
+ self.join(10)
+ if self.isAlive():
+ raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+ def run(self):
+ # Agent application main processing loop
+ while self.running:
+ self.notifier.wait_for_work(None)
+ wi = self.agent.get_next_workitem(timeout=0)
+ while wi is not None:
+ if wi.get_type() == WorkItem.METHOD_CALL:
+ mc = wi.get_params()
+ if not isinstance(mc, MethodCallParams):
+ raise Exception("Unexpected method call parameters")
+
+ if mc.get_name() == "set_meth":
+ obj = self.agent.get_object(mc.get_object_id())
+ if obj is None:
+ error_info = QmfData.create({"code": -2,
+ "description":
+ "Bad Object Id."})
+ self.agent.method_response(wi.get_handle(),
+ _error=error_info)
+ else:
+ obj.inc_value("method_call_count")
+ if "arg_int" in mc.get_args():
+ obj.set_value("set_int", mc.get_args()["arg_int"])
+ if "arg_str" in mc.get_args():
+ obj.set_value("set_string", mc.get_args()["arg_str"])
+ self.agent.method_response(wi.get_handle(),
+ {"code" : 0})
+ elif mc.get_name() == "a_method":
+ obj = self.agent.get_object(mc.get_object_id())
+ if obj is None:
+ error_info = QmfData.create({"code": -3,
+ "description":
+ "Unknown object id."})
+ self.agent.method_response(wi.get_handle(),
+ _error=error_info)
+ elif obj.get_object_id() != "01545":
+ error_info = QmfData.create({"code": -4,
+ "description":
+ "Unexpected id."})
+ self.agent.method_response(wi.get_handle(),
+ _error=error_info)
+ else:
+ args = mc.get_args()
+ if ("arg1" in args and args["arg1"] == 1 and
+ "arg2" in args and args["arg2"] == "Now set!"
+ and "arg3" in args and args["arg3"] == 1966):
+ self.agent.method_response(wi.get_handle(),
+ {"code" : 0})
+ else:
+ error_info = QmfData.create({"code": -5,
+ "description":
+ "Bad Args."})
+ self.agent.method_response(wi.get_handle(),
+ _error=error_info)
+ else:
+ error_info = QmfData.create({"code": -1,
+ "description":
+ "Unknown method call."})
+ self.agent.method_response(wi.get_handle(), _error=error_info)
+
+ self.agent.release_workitem(wi)
+ wi = self.agent.get_next_workitem(timeout=0)
+
+
+
+class BaseTest(unittest.TestCase):
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
+ def setUp(self):
+ # one second agent indication interval
+ self.agent1 = _agentApp("agent1", 1)
+ self.agent1.connect_agent(self.broker)
+ self.agent2 = _agentApp("agent2", 1)
+ self.agent2.connect_agent(self.broker)
+
+ def tearDown(self):
+ if self.agent1:
+ self.agent1.shutdown_agent(10)
+ self.agent1.stop()
+ self.agent1 = None
+ if self.agent2:
+ self.agent2.shutdown_agent(10)
+ self.agent2.stop()
+ self.agent2 = None
+
+ def test_described_obj(self):
+ # create console
+ # find agents
+ # synchronous query for all objects in schema
+ # method call on each object
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
+ QmfQueryPredicate(
+ {QmfQuery.LOGIC_AND:
+ [{QmfQuery.CMP_EXISTS: [SchemaClassId.KEY_PACKAGE]},
+ {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE,
+ "MyPackage"]}]}))
+
+ obj_list = self.console.doQuery(agent, query)
+ self.assertTrue(len(obj_list) == 2)
+ for obj in obj_list:
+ mr = obj.invoke_method( "set_meth", {"arg_int": -99,
+ "arg_str": "Now set!"},
+ _timeout=3)
+ self.assertTrue(isinstance(mr, qmf.qmfConsole.MethodResult))
+ self.assertTrue(mr.succeeded())
+ self.assertTrue(mr.get_argument("code") == 0)
+
+ self.assertTrue(obj.get_value("method_call_count") == 0)
+ self.assertTrue(obj.get_value("set_string") == "UNSET")
+ self.assertTrue(obj.get_value("set_int") == 0)
+
+ obj.refresh()
+
+ self.assertTrue(obj.get_value("method_call_count") == 1)
+ self.assertTrue(obj.get_value("set_string") == "Now set!")
+ self.assertTrue(obj.get_value("set_int") == -99)
+
+ self.console.destroy(10)
+
+
+ def test_bad_method(self):
+ # create console
+ # find agents
+ # synchronous query for all objects in schema
+ # invalid method call on each object
+ # - should throw a ValueError
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
+ QmfQueryPredicate(
+ {QmfQuery.LOGIC_AND:
+ [{QmfQuery.CMP_EXISTS: [SchemaClassId.KEY_PACKAGE]},
+ {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE,
+ "MyPackage"]}]}))
+
+ obj_list = self.console.doQuery(agent, query)
+ self.assertTrue(len(obj_list) == 2)
+ for obj in obj_list:
+ self.failUnlessRaises(ValueError,
+ obj.invoke_method,
+ "unknown_meth",
+ {"arg1": -99, "arg2": "Now set!"},
+ _timeout=3)
+ self.console.destroy(10)
+
+
+ def test_managed_obj(self):
+ # create console
+ # find agents
+ # synchronous query for a managed object
+ # method call on each object
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "01545")
+ obj_list = self.console.doQuery(agent, query)
+
+ self.assertTrue(isinstance(obj_list, type([])))
+ self.assertTrue(len(obj_list) == 1)
+ obj = obj_list[0]
+
+ mr = obj.invoke_method("a_method",
+ {"arg1": 1,
+ "arg2": "Now set!",
+ "arg3": 1966},
+ _timeout=3)
+ self.assertTrue(isinstance(mr, qmf.qmfConsole.MethodResult))
+ self.assertTrue(mr.succeeded())
+ self.assertTrue(mr.get_argument("code") == 0)
+ # @todo refresh and verify changes
+
+ self.console.destroy(10)
diff --git a/qpid/python/qmf2/tests/basic_query.py b/qpid/python/qmf2/tests/basic_query.py
new file mode 100644
index 0000000000..46dc87f4a1
--- /dev/null
+++ b/qpid/python/qmf2/tests/basic_query.py
@@ -0,0 +1,336 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+ SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+ QmfData, QmfQueryPredicate)
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+ def __init__(self):
+ self._event = Event()
+
+ def indication(self):
+ # note: called by qmf daemon thread
+ self._event.set()
+
+ def wait_for_work(self, timeout):
+ # note: called by application thread to wait
+ # for qmf to generate work
+ self._event.wait(timeout)
+ timed_out = self._event.isSet() == False
+ if not timed_out:
+ self._event.clear()
+ return True
+ return False
+
+
+class _agentApp(Thread):
+ def __init__(self, name, heartbeat):
+ Thread.__init__(self)
+ self.notifier = _testNotifier()
+ self.agent = Agent(name,
+ _notifier=self.notifier,
+ _heartbeat_interval=heartbeat)
+
+ # Dynamically construct a management database
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+ _desc="A test data schema",
+ _object_id_names=["index1", "index2"] )
+ # add properties
+ _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ # these two properties are statistics
+ _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # These two properties can be set via the method call
+ _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # add method
+ _meth = SchemaMethod( _desc="Method to set string and int in object." )
+ _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+ _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+ _schema.add_method( "set_meth", _meth )
+
+ # Add schema to Agent
+
+ self.agent.register_object_class(_schema)
+
+ # instantiate managed data objects matching the schema
+
+ _obj1 = QmfAgentData( self.agent, _schema=_schema )
+ _obj1.set_value("index1", 100)
+ _obj1.set_value("index2", "a name" )
+ _obj1.set_value("set_string", "UNSET")
+ _obj1.set_value("set_int", 0)
+ _obj1.set_value("query_count", 0)
+ _obj1.set_value("method_call_count", 0)
+ self.agent.add_object( _obj1 )
+
+ self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+ _values={"index1":99,
+ "index2": "another name",
+ "set_string": "UNSET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0} ))
+
+ # add an "unstructured" object to the Agent
+ _obj2 = QmfAgentData(self.agent, _object_id="01545")
+ _obj2.set_value("field1", "a value")
+ _obj2.set_value("field2", 2)
+ _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj2.set_value("field4", ["a", "list", "value"])
+ self.agent.add_object(_obj2)
+
+ self.running = True
+ self.start()
+
+ def connect_agent(self, broker_url):
+ # broker_url = "user/passwd@hostname:port"
+ self.conn = qpid.messaging.Connection(broker_url.host,
+ broker_url.port,
+ broker_url.user,
+ broker_url.password)
+ self.conn.connect()
+ self.agent.set_connection(self.conn)
+
+ def disconnect_agent(self, timeout):
+ if self.conn:
+ self.agent.remove_connection(timeout)
+
+ def shutdown_agent(self, timeout):
+ self.agent.destroy(timeout)
+
+ def stop(self):
+ self.running = False
+ self.notifier.indication() # hmmm... collide with daemon???
+ self.join(10)
+ if self.isAlive():
+ raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+ def run(self):
+ while self.running:
+ self.notifier.wait_for_work(None)
+ wi = self.agent.get_next_workitem(timeout=0)
+ while wi is not None:
+ logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+ self.agent.release_workitem(wi)
+ wi = self.agent.get_next_workitem(timeout=0)
+
+
+
+class BaseTest(unittest.TestCase):
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
+ def setUp(self):
+ # one second agent indication interval
+ self.agent1 = _agentApp("agent1", 1)
+ self.agent1.connect_agent(self.broker)
+ self.agent2 = _agentApp("agent2", 1)
+ self.agent2.connect_agent(self.broker)
+
+ def tearDown(self):
+ if self.agent1:
+ self.agent1.shutdown_agent(10)
+ self.agent1.stop()
+ self.agent1 = None
+ if self.agent2:
+ self.agent2.shutdown_agent(10)
+ self.agent2.stop()
+ self.agent2 = None
+
+ def test_all_oids(self):
+ # create console
+ # find agents
+ # synchronous query for all objects by id
+ # verify known object ids are returned
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID)
+ oid_list = self.console.doQuery(agent, query)
+
+ self.assertTrue(isinstance(oid_list, type([])),
+ "Unexpected return type")
+ self.assertTrue(len(oid_list) == 3, "Wrong count")
+ self.assertTrue('100a name' in oid_list)
+ self.assertTrue('99another name' in oid_list)
+ self.assertTrue('01545' in oid_list)
+
+ self.console.destroy(10)
+
+
+ def test_direct_oids(self):
+ # create console
+ # find agents
+ # synchronous query for each objects
+ # verify objects and schemas are correct
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ for oid in ['100a name', '99another name', '01545']:
+ query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, oid)
+ obj_list = self.console.doQuery(agent, query)
+
+ self.assertTrue(isinstance(obj_list, type([])),
+ "Unexpected return type")
+ self.assertTrue(len(obj_list) == 1)
+ obj = obj_list[0]
+ self.assertTrue(isinstance(obj, QmfData))
+ self.assertTrue(obj.get_object_id() == oid)
+
+ if obj.is_described():
+ self.assertTrue(oid in ['100a name', '99another name'])
+ schema_id = obj.get_schema_class_id()
+ self.assertTrue(isinstance(schema_id, SchemaClassId))
+ else:
+ self.assertTrue(oid == "01545")
+
+
+
+ self.console.destroy(10)
+
+
+
+ def test_packages(self):
+ # create console
+ # find agents
+ # synchronous query all package names
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES)
+ package_list = self.console.doQuery(agent, query)
+ self.assertTrue(len(package_list) == 1)
+ self.assertTrue('MyPackage' in package_list)
+
+
+ self.console.destroy(10)
+
+
+
+ def test_predicate_schema_id(self):
+ # create console
+ # find agents
+ # synchronous query for all schema by package name
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA,
+ QmfQueryPredicate(
+ {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE,
+ "MyPackage"]}))
+
+ schema_list = self.console.doQuery(agent, query)
+ self.assertTrue(len(schema_list))
+ for schema in schema_list:
+ self.assertTrue(schema.get_class_id().get_package_name() ==
+ "MyPackage")
+
+
+ self.console.destroy(10)
+
+
+
+ def test_predicate_no_match(self):
+ # create console
+ # find agents
+ # synchronous query for all schema by package name
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA,
+ QmfQueryPredicate(
+ {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE,
+ "No-Such-Package"]}))
+
+ schema_list = self.console.doQuery(agent, query)
+ self.assertTrue(len(schema_list) == 0)
+
+ self.console.destroy(10)
+
+
diff --git a/qpid/python/qmf2/tests/console_test.py b/qpid/python/qmf2/tests/console_test.py
new file mode 100644
index 0000000000..ac0e064f20
--- /dev/null
+++ b/qpid/python/qmf2/tests/console_test.py
@@ -0,0 +1,175 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import logging
+import time
+from threading import Semaphore
+
+
+from qpid.messaging import *
+from qmf2.common import (Notifier, QmfQuery, QmfQueryPredicate, MsgKey,
+ SchemaClassId, SchemaClass, QmfData)
+from qmf2.console import Console
+
+
+class ExampleNotifier(Notifier):
+ def __init__(self):
+ self._sema4 = Semaphore(0) # locked
+
+ def indication(self):
+ self._sema4.release()
+
+ def waitForWork(self):
+ print("Waiting for event...")
+ self._sema4.acquire()
+ print("...event present")
+
+
+logging.getLogger().setLevel(logging.INFO)
+
+print( "Starting Connection" )
+_c = Connection("localhost")
+_c.connect()
+
+print( "Starting Console" )
+
+_notifier = ExampleNotifier()
+_myConsole = Console(notifier=_notifier)
+_myConsole.addConnection( _c )
+
+# Allow discovery only for the agent named "qmf.testAgent"
+# @todo: replace "manual" query construction with
+# a formal class-based Query API
+_query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT,
+ QmfQueryPredicate({QmfQuery.CMP_EQ:
+ [QmfQuery.KEY_AGENT_NAME,
+ "qmf.testAgent"]}))
+_myConsole.enable_agent_discovery(_query)
+
+_done = False
+while not _done:
+# try:
+ _notifier.waitForWork()
+
+ _wi = _myConsole.get_next_workitem(timeout=0)
+ while _wi:
+ print("!!! work item received %d:%s" % (_wi.get_type(),
+ str(_wi.get_params())))
+
+
+ if _wi.get_type() == _wi.AGENT_ADDED:
+ _agent = _wi.get_params().get("agent")
+ if not _agent:
+ print("!!!! AGENT IN REPLY IS NULL !!! ")
+
+ _query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID)
+ oid_list = _myConsole.doQuery(_agent, _query)
+
+ print("!!!************************** REPLY=%s" % oid_list)
+
+ for oid in oid_list:
+ _query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT,
+ oid)
+ obj_list = _myConsole.doQuery(_agent, _query)
+
+ print("!!!************************** REPLY=%s" % obj_list)
+
+ if obj_list is None:
+ obj_list={}
+
+ for obj in obj_list:
+ resp = obj.invoke_method( "set_meth",
+ {"arg_int": -11,
+ "arg_str": "are we not goons?"},
+ None,
+ 3)
+ if resp is None:
+ print("!!!*** NO RESPONSE FROM METHOD????")
+ else:
+ print("!!! method succeeded()=%s" % resp.succeeded())
+ print("!!! method exception()=%s" % resp.get_exception())
+ print("!!! method get args() = %s" % resp.get_arguments())
+
+ if not obj.is_described():
+ resp = obj.invoke_method( "bad method",
+ {"arg_int": -11,
+ "arg_str": "are we not goons?"},
+ None,
+ 3)
+ if resp is None:
+ print("!!!*** NO RESPONSE FROM METHOD????")
+ else:
+ print("!!! method succeeded()=%s" % resp.succeeded())
+ print("!!! method exception()=%s" % resp.get_exception())
+ print("!!! method get args() = %s" % resp.get_arguments())
+
+
+ #---------------------------------
+ #_query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "99another name")
+
+ #obj_list = _myConsole.doQuery(_agent, _query)
+
+ #---------------------------------
+
+ # _query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES)
+
+ # package_list = _myConsole.doQuery(_agent, _query)
+
+ # for pname in package_list:
+ # print("!!! Querying for schema from package: %s" % pname)
+ # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID,
+ # QmfQueryPredicate(
+ # {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, pname]}))
+
+ # schema_id_list = _myConsole.doQuery(_agent, _query)
+ # for sid in schema_id_list:
+ # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA,
+ # QmfQueryPredicate(
+ # {QmfQuery.CMP_EQ: [SchemaClass.KEY_SCHEMA_ID,
+ # sid.map_encode()]}))
+
+ # schema_list = _myConsole.doQuery(_agent, _query)
+ # for schema in schema_list:
+ # sid = schema.get_class_id()
+ # _query = QmfQuery.create_predicate(
+ # QmfQuery.TARGET_OBJECT_ID,
+ # QmfQueryPredicate({QmfQuery.CMP_EQ:
+ # [QmfData.KEY_SCHEMA_ID,
+ # sid.map_encode()]}))
+
+ # oid_list = _myConsole.doQuery(_agent, _query)
+ # for oid in oid_list:
+ # _query = QmfQuery.create_id(
+ # QmfQuery.TARGET_OBJECT, oid)
+ # _reply = _myConsole.doQuery(_agent, _query)
+
+ # print("!!!************************** REPLY=%s" % _reply)
+
+
+ _myConsole.release_workitem(_wi)
+ _wi = _myConsole.get_next_workitem(timeout=0)
+# except:
+# logging.info( "shutting down..." )
+# _done = True
+
+print( "Removing connection" )
+_myConsole.removeConnection( _c, 10 )
+
+print( "Destroying console:" )
+_myConsole.destroy( 10 )
+
+print( "******** console test done ********" )
diff --git a/qpid/python/qmf2/tests/events.py b/qpid/python/qmf2/tests/events.py
new file mode 100644
index 0000000000..8ce534ce3a
--- /dev/null
+++ b/qpid/python/qmf2/tests/events.py
@@ -0,0 +1,193 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import time
+import datetime
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+ SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+ QmfData, QmfQueryPredicate, SchemaEventClass,
+ QmfEvent)
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+ def __init__(self):
+ self._event = Event()
+
+ def indication(self):
+ # note: called by qmf daemon thread
+ self._event.set()
+
+ def wait_for_work(self, timeout):
+ # note: called by application thread to wait
+ # for qmf to generate work
+ self._event.wait(timeout)
+ timed_out = self._event.isSet() == False
+ if not timed_out:
+ self._event.clear()
+ return True
+ return False
+
+
+class _agentApp(Thread):
+ def __init__(self, name, broker_url, heartbeat):
+ Thread.__init__(self)
+ self.timeout = 3
+ self.broker_url = broker_url
+ self.notifier = _testNotifier()
+ self.agent = Agent(name,
+ _notifier=self.notifier,
+ _heartbeat_interval=heartbeat)
+
+ # Dynamically construct a management database
+
+ _schema = SchemaEventClass(_classId=SchemaClassId("MyPackage",
+ "MyClass",
+ stype=SchemaClassId.TYPE_EVENT),
+ _desc="A test event schema")
+ # add properties
+ _schema.add_property( "prop-1", SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.add_property( "prop-2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ # Add schema to Agent
+ self.schema = _schema
+ self.agent.register_object_class(_schema)
+
+ self.running = False
+
+ def start_app(self):
+ self.running = True
+ self.start()
+
+ def stop_app(self):
+ self.running = False
+ # wake main thread
+ self.notifier.indication() # hmmm... collide with daemon???
+ self.join(self.timeout)
+ if self.isAlive():
+ raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+ def run(self):
+ # broker_url = "user/passwd@hostname:port"
+ conn = qpid.messaging.Connection(self.broker_url.host,
+ self.broker_url.port,
+ self.broker_url.user,
+ self.broker_url.password)
+ conn.connect()
+ self.agent.set_connection(conn)
+
+ counter = 1
+ while self.running:
+ # post an event every second
+ event = QmfEvent.create(long(time.time() * 1000),
+ QmfEvent.SEV_WARNING,
+ {"prop-1": counter,
+ "prop-2": str(datetime.datetime.utcnow())},
+ _schema=self.schema)
+ counter += 1
+ self.agent.raise_event(event)
+ wi = self.agent.get_next_workitem(timeout=0)
+ while wi is not None:
+ logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+ self.agent.release_workitem(wi)
+ wi = self.agent.get_next_workitem(timeout=0)
+ self.notifier.wait_for_work(1)
+
+ self.agent.remove_connection(self.timeout)
+ self.agent.destroy(self.timeout)
+
+
+
+class BaseTest(unittest.TestCase):
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
+ def setUp(self):
+ # one second agent indication interval
+ self.agent1 = _agentApp("agent1", self.broker, 1)
+ self.agent1.start_app()
+ self.agent2 = _agentApp("agent2", self.broker, 1)
+ self.agent2.start_app()
+
+ def tearDown(self):
+ if self.agent1:
+ self.agent1.stop_app()
+ self.agent1 = None
+ if self.agent2:
+ self.agent2.stop_app()
+ self.agent2 = None
+
+ def test_get_events(self):
+ # create console
+ # find agents
+
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ # find the agents
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # now wait for events
+ agent1_events = agent2_events = 0
+ wi = self.console.get_next_workitem(timeout=4)
+ while wi:
+ if wi.get_type() == wi.EVENT_RECEIVED:
+ event = wi.get_params().get("event")
+ self.assertTrue(isinstance(event, QmfEvent))
+ self.assertTrue(event.get_severity() == QmfEvent.SEV_WARNING)
+ self.assertTrue(event.get_value("prop-1") > 0)
+
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent1":
+ agent1_events += 1
+ elif agent.get_name() == "agent2":
+ agent2_events += 1
+ else:
+ self.fail("Unexpected agent name received: %s" %
+ agent.get_name())
+ if agent1_events and agent2_events:
+ break;
+
+ wi = self.console.get_next_workitem(timeout=4)
+
+ self.assertTrue(agent1_events > 0 and agent2_events > 0)
+
+ self.console.destroy(10)
+
+
+
+
diff --git a/qpid/python/qmf2/tests/obj_gets.py b/qpid/python/qmf2/tests/obj_gets.py
new file mode 100644
index 0000000000..e58575440d
--- /dev/null
+++ b/qpid/python/qmf2/tests/obj_gets.py
@@ -0,0 +1,399 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+ SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+ QmfData, QmfQueryPredicate)
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+ def __init__(self):
+ self._event = Event()
+
+ def indication(self):
+ # note: called by qmf daemon thread
+ self._event.set()
+
+ def wait_for_work(self, timeout):
+ # note: called by application thread to wait
+ # for qmf to generate work
+ self._event.wait(timeout)
+ timed_out = self._event.isSet() == False
+ if not timed_out:
+ self._event.clear()
+ return True
+ return False
+
+
+class _agentApp(Thread):
+ def __init__(self, name, heartbeat):
+ Thread.__init__(self)
+ self.notifier = _testNotifier()
+ self.agent = Agent(name,
+ _notifier=self.notifier,
+ _heartbeat_interval=heartbeat)
+
+ # Management Database
+ # - two different schema packages,
+ # - two classes within one schema package
+ # - multiple objects per schema package+class
+ # - two "undescribed" objects
+
+ # "package1/class1"
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class1"),
+ _desc="A test data schema - one",
+ _object_id_names=["key"] )
+
+ _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ self.agent.register_object_class(_schema)
+
+ _obj = QmfAgentData( self.agent, _schema=_schema )
+ _obj.set_value("key", "p1c1_key1")
+ _obj.set_value("count1", 0)
+ _obj.set_value("count2", 0)
+ self.agent.add_object( _obj )
+
+ _obj = QmfAgentData( self.agent, _schema=_schema )
+ _obj.set_value("key", "p1c1_key2")
+ _obj.set_value("count1", 9)
+ _obj.set_value("count2", 10)
+ self.agent.add_object( _obj )
+
+ # "package1/class2"
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class2"),
+ _desc="A test data schema - two",
+ _object_id_names=["name"] )
+ # add properties
+ _schema.add_property( "name", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "string1", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ self.agent.register_object_class(_schema)
+
+ _obj = QmfAgentData( self.agent, _schema=_schema )
+ _obj.set_value("name", "p1c2_name1")
+ _obj.set_value("string1", "a data string")
+ self.agent.add_object( _obj )
+
+
+ # "package2/class1"
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("package2", "class1"),
+ _desc="A test data schema - second package",
+ _object_id_names=["key"] )
+
+ _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "counter", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ self.agent.register_object_class(_schema)
+
+ _obj = QmfAgentData( self.agent, _schema=_schema )
+ _obj.set_value("key", "p2c1_key1")
+ _obj.set_value("counter", 0)
+ self.agent.add_object( _obj )
+
+ _obj = QmfAgentData( self.agent, _schema=_schema )
+ _obj.set_value("key", "p2c1_key2")
+ _obj.set_value("counter", 2112)
+ self.agent.add_object( _obj )
+
+
+ # add two "unstructured" objects to the Agent
+
+ _obj = QmfAgentData(self.agent, _object_id="undesc-1")
+ _obj.set_value("field1", "a value")
+ _obj.set_value("field2", 2)
+ _obj.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj.set_value("field4", ["a", "list", "value"])
+ self.agent.add_object(_obj)
+
+
+ _obj = QmfAgentData(self.agent, _object_id="undesc-2")
+ _obj.set_value("key-1", "a value")
+ _obj.set_value("key-2", 2)
+ self.agent.add_object(_obj)
+
+ self.running = True
+ self.start()
+
+ def connect_agent(self, broker_url):
+ # broker_url = "user/passwd@hostname:port"
+ self.conn = qpid.messaging.Connection(broker_url.host,
+ broker_url.port,
+ broker_url.user,
+ broker_url.password)
+ self.conn.connect()
+ self.agent.set_connection(self.conn)
+
+ def disconnect_agent(self, timeout):
+ if self.conn:
+ self.agent.remove_connection(timeout)
+
+ def shutdown_agent(self, timeout):
+ self.agent.destroy(timeout)
+
+ def stop(self):
+ self.running = False
+ self.notifier.indication() # hmmm... collide with daemon???
+ self.join(10)
+ if self.isAlive():
+ raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+ def run(self):
+ while self.running:
+ self.notifier.wait_for_work(None)
+ wi = self.agent.get_next_workitem(timeout=0)
+ while wi is not None:
+ logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+ self.agent.release_workitem(wi)
+ wi = self.agent.get_next_workitem(timeout=0)
+
+
+
+class BaseTest(unittest.TestCase):
+ agent_count = 5
+
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
+ def setUp(self):
+ self.agents = []
+ for i in range(self.agent_count):
+ agent = _agentApp("agent-" + str(i), 1)
+ agent.connect_agent(self.broker)
+ self.agents.append(agent)
+
+ def tearDown(self):
+ for agent in self.agents:
+ if agent is not None:
+ agent.shutdown_agent(10)
+ agent.stop()
+
+
+ def test_all_agents(self):
+ # create console
+ # find all agents
+ # synchronous query for all objects by id
+ # verify known object ids are returned
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # console has discovered all agents, now query all undesc-2 objects
+ objs = self.console.get_objects(_object_id="undesc-2", _timeout=5)
+ self.assertTrue(len(objs) == self.agent_count)
+ for obj in objs:
+ self.assertTrue(obj.get_object_id() == "undesc-2")
+
+ # now query all objects from schema "package1"
+ objs = self.console.get_objects(_pname="package1", _timeout=5)
+ self.assertTrue(len(objs) == (self.agent_count * 3))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+
+ # now query all objects from schema "package2"
+ objs = self.console.get_objects(_pname="package2", _timeout=5)
+ self.assertTrue(len(objs) == (self.agent_count * 2))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+
+ # now query all objects from schema "package1/class2"
+ objs = self.console.get_objects(_pname="package1", _cname="class2", _timeout=5)
+ self.assertTrue(len(objs) == self.agent_count)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+
+ # given the schema identifier from the last query, repeat using the
+ # specific schema id
+ schema_id = objs[0].get_schema_class_id()
+ objs = self.console.get_objects(_schema_id=schema_id, _timeout=5)
+ self.assertTrue(len(objs) == self.agent_count)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id() == schema_id)
+
+
+ self.console.destroy(10)
+
+
+
+ def test_agent_subset(self):
+ # create console
+ # find all agents
+ # synchronous query for all objects by id
+ # verify known object ids are returned
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ agent_list = []
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+ agent_list.append(agent)
+
+ # Only use a subset of the agents:
+ agent_list = agent_list[:len(agent_list)/2]
+
+ # console has discovered all agents, now query all undesc-2 objects
+ objs = self.console.get_objects(_object_id="undesc-2",
+ _agents=agent_list, _timeout=5)
+ self.assertTrue(len(objs) == len(agent_list))
+ for obj in objs:
+ self.assertTrue(obj.get_object_id() == "undesc-2")
+
+ # now query all objects from schema "package1"
+ objs = self.console.get_objects(_pname="package1",
+ _agents=agent_list,
+ _timeout=5)
+ self.assertTrue(len(objs) == (len(agent_list) * 3))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+
+ # now query all objects from schema "package2"
+ objs = self.console.get_objects(_pname="package2",
+ _agents=agent_list,
+ _timeout=5)
+ self.assertTrue(len(objs) == (len(agent_list) * 2))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+
+ # now query all objects from schema "package1/class2"
+ objs = self.console.get_objects(_pname="package1", _cname="class2",
+ _agents=agent_list,
+ _timeout=5)
+ self.assertTrue(len(objs) == len(agent_list))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+
+ # given the schema identifier from the last query, repeat using the
+ # specific schema id
+ schema_id = objs[0].get_schema_class_id()
+ objs = self.console.get_objects(_schema_id=schema_id,
+ _agents=agent_list,
+ _timeout=5)
+ self.assertTrue(len(objs) == len(agent_list))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id() == schema_id)
+
+
+ self.console.destroy(10)
+
+
+
+ def test_single_agent(self):
+ # create console
+ # find all agents
+ # synchronous query for all objects by id
+ # verify known object ids are returned
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ agent_list = []
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+ agent_list.append(agent)
+
+ # Only use one agetn
+ agent = agent_list[0]
+
+ # console has discovered all agents, now query all undesc-2 objects
+ objs = self.console.get_objects(_object_id="undesc-2",
+ _agents=agent, _timeout=5)
+ self.assertTrue(len(objs) == 1)
+ for obj in objs:
+ self.assertTrue(obj.get_object_id() == "undesc-2")
+
+ # now query all objects from schema "package1"
+ objs = self.console.get_objects(_pname="package1",
+ _agents=agent,
+ _timeout=5)
+ self.assertTrue(len(objs) == 3)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+
+ # now query all objects from schema "package2"
+ objs = self.console.get_objects(_pname="package2",
+ _agents=agent,
+ _timeout=5)
+ self.assertTrue(len(objs) == 2)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+
+ # now query all objects from schema "package1/class2"
+ objs = self.console.get_objects(_pname="package1", _cname="class2",
+ _agents=agent,
+ _timeout=5)
+ self.assertTrue(len(objs) == 1)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+
+ # given the schema identifier from the last query, repeat using the
+ # specific schema id
+ schema_id = objs[0].get_schema_class_id()
+ objs = self.console.get_objects(_schema_id=schema_id,
+ _agents=agent,
+ _timeout=5)
+ self.assertTrue(len(objs) == 1)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id() == schema_id)
+
+
+ self.console.destroy(10)
+