diff options
-rw-r--r-- | python/qmf2/agent.py | 90 | ||||
-rw-r--r-- | python/qmf2/common.py | 84 | ||||
-rw-r--r-- | python/qmf2/console.py | 170 | ||||
-rw-r--r-- | python/qmf2/tests/basic_method.py | 117 | ||||
-rw-r--r-- | python/qmf2/tests/basic_query.py | 137 | ||||
-rw-r--r-- | python/qmf2/tests/events.py | 2 | ||||
-rw-r--r-- | python/qmf2/tests/obj_gets.py | 119 |
7 files changed, 549 insertions, 170 deletions
diff --git a/python/qmf2/agent.py b/python/qmf2/agent.py index 7c090ad36b..fd63807dc3 100644 --- a/python/qmf2/agent.py +++ b/python/qmf2/agent.py @@ -26,7 +26,8 @@ from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 from common import (make_subject, parse_subject, OpCode, QmfQuery, SchemaObjectClass, MsgKey, QmfData, QmfAddress, - SchemaClass, SchemaClassId, WorkItem, SchemaMethod) + SchemaClass, SchemaClassId, WorkItem, SchemaMethod, + timedelta_to_secs) # global flag that indicates which thread (if any) is # running the agent notifier callback @@ -42,18 +43,22 @@ class _MethodCallHandle(object): application. Given to the app in a WorkItem, provided to the agent when method_response() is invoked. """ - def __init__(self, correlation_id, reply_to, meth_name, _oid=None): + def __init__(self, correlation_id, reply_to, meth_name, _oid=None, + _schema_id=None): self.correlation_id = correlation_id self.reply_to = reply_to self.meth_name = meth_name self.oid = _oid + self.schema_id = _schema_id class MethodCallParams(object): """ """ - def __init__(self, name, _oid=None, _in_args=None, _user_id=None): + def __init__(self, name, _oid=None, _schema_id=None, _in_args=None, + _user_id=None): self._meth_name = name self._oid = _oid + self._schema_id = _schema_id self._in_args = _in_args self._user_id = _user_id @@ -63,6 +68,9 @@ class MethodCallParams(object): def get_object_id(self): return self._oid + def get_schema_id(self): + return self._schema_id + def get_args(self): return self._in_args @@ -100,7 +108,12 @@ class Agent(Thread): self._packages = {} self._schema_timestamp = long(0) self._schema = {} - self._agent_data = {} + # _described_data holds QmfData objects that are associated with schema + # it is index by schema_id, object_id + self._described_data = {} + # _undescribed_data holds unstructured QmfData objects - these objects + # have no schema. it is indexed by object_id only. + self._undescribed_data = {} self._work_q = Queue.Queue() self._work_q_put = False @@ -247,7 +260,7 @@ class Agent(Thread): # (self.name, str(msg))) self._topic_sender.send(msg) - def add_object(self, data ): + def add_object(self, data): """ Register an instance of a QmfAgentData object. """ @@ -256,20 +269,34 @@ class Agent(Thread): if not isinstance(data, QmfAgentData): raise TypeError("QmfAgentData instance expected") - id_ = data.get_object_id() - if not id_: + oid = data.get_object_id() + if not oid: raise TypeError("No identifier assigned to QmfAgentData!") + sid = data.get_schema_class_id() + self._lock.acquire() try: - self._agent_data[id_] = data + if sid: + if sid not in self._described_data: + self._described_data[sid] = {oid: data} + else: + self._described_data[sid][oid] = data + else: + self._undescribed_data[oid] = data finally: self._lock.release() - def get_object(self, id): + def get_object(self, oid, schema_id): + data = None self._lock.acquire() try: - data = self._agent_data.get(id) + if schema_id: + data = self._described_data.get(schema_id) + if data: + data = data.get(oid) + else: + data = self._undescribed_data.get(oid) finally: self._lock.release() return data @@ -284,6 +311,8 @@ class Agent(Thread): _map = {SchemaMethod.KEY_NAME:handle.meth_name} if handle.oid is not None: _map[QmfData.KEY_OBJECT_ID] = handle.oid + if handle.schema_id is not None: + _map[QmfData.KEY_SCHEMA_ID] = handle.schema_id.map_encode() if _out_args is not None: _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy() if _error is not None: @@ -340,7 +369,7 @@ class Agent(Thread): logging.debug("Agent Indication Sent") next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) - timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds + timeout = timedelta_to_secs(next_heartbeat - now) # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout)) try: self._session.next_receiver(timeout=timeout) @@ -519,11 +548,14 @@ class Agent(Thread): in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS) oid = cmap.get(QmfData.KEY_OBJECT_ID) + schema_id = cmap.get(QmfData.KEY_SCHEMA_ID) + if schema_id: + schema_id = SchemaClassId.from_map(schema_id) handle = _MethodCallHandle(msg.correlation_id, msg.reply_to, mname, - oid) - param = MethodCallParams( mname, oid, in_args, msg.user_id) + oid, schema_id) + param = MethodCallParams( mname, oid, schema_id, in_args, msg.user_id) self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param)) self._work_q_put = True @@ -594,12 +626,24 @@ class Agent(Thread): """ """ data_objs = [] + # extract optional schema_id from target params + sid = None + t_params = query.get_target_param() + if t_params: + sid = t_params.get(QmfData.KEY_SCHEMA_ID) + # if querying for a specific object, do a direct lookup if query.get_selector() == QmfQuery.ID: + oid = query.get_id() found = None self._lock.acquire() try: - found = self._agent_data.get(query.get_id()) + if sid: + found = self._described_data.get(sid) + if found: + found = found.get(oid) + else: + found = self._undescribed_data.get(oid) finally: self._lock.release() if found: @@ -610,12 +654,18 @@ class Agent(Thread): else: # otherwise, evaluate all data self._lock.acquire() try: - for oid,val in self._agent_data.iteritems(): - if query.evaluate(val): - if _idOnly: - data_objs.append(oid) - else: - data_objs.append(val.map_encode()) + if sid: + db = self._described_data.get(sid) + else: + db = self._undescribed_data + + if db: + for oid,val in db.iteritems(): + if query.evaluate(val): + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(val.map_encode()) finally: self._lock.release() diff --git a/python/qmf2/common.py b/python/qmf2/common.py index affc49f85b..fabe24b30b 100644 --- a/python/qmf2/common.py +++ b/python/qmf2/common.py @@ -90,6 +90,12 @@ def parse_subject(_sub): return _sub[3:].split('.', 1) +def timedelta_to_secs(td): + """ + Convert a time delta to a time interval in seconds (float) + """ + return td.days * 86400 + td.seconds + td.microseconds/1000000.0 + ##============================================================================== ## Async Event Model @@ -906,8 +912,10 @@ class QmfQuery(_mapEncoder): TARGET_SCHEMA_ID="schema_id" TARGET_SCHEMA="schema" - # allowed predicate key(s): + # allowed id: value: + # SchemaClassId # + # allowed predicate key(s): # SchemaClassId.KEY_PACKAGE # SchemaClassId.KEY_CLASS # SchemaClassId.KEY_TYPE @@ -917,19 +925,22 @@ class QmfQuery(_mapEncoder): # name of method (exist test only) TARGET_AGENT="agent" + # allowed id: value: + # string name of agent # allowed predicate keys(s): - # + # KEY_AGENT_NAME="_name" TARGET_OBJECT_ID="object_id" TARGET_OBJECT="object" + # If object is described by a schema, the value of the target map must + # include a "_schema_id": {map encoded schema id} value. + # + # allowed id: value: + # object_id string + # # allowed predicate keys(s): # - # SchemaClassId.KEY_PACKAGE - # SchemaClassId.KEY_CLASS - # SchemaClassId.KEY_TYPE - # SchemaClassId.KEY_HASH - # QmfData.KEY_SCHEMA_ID # QmfData.KEY_OBJECT_ID # QmfData.KEY_UPDATE_TS # QmfData.KEY_CREATE_TS @@ -977,8 +988,23 @@ class QmfQuery(_mapEncoder): if key in self._valid_targets: _target = key break + if _target is None: + raise TypeError("Invalid QmfQuery target: '%s'" % + str(target_map)) + # convert target params from map format _target_params = target_map.get(_target) + if _target_params: + if not isinstance(_target_params, type({})): + raise TypeError("target params must be a map: '%s'" % + str(_target_params)) + t_params = {} + for name,value in _target_params.iteritems(): + if name == QmfData.KEY_SCHEMA_ID: + t_params[name] = SchemaClassId.from_map(value) + else: + t_params[name] = value + _target_params = t_params _id = _map.get(self.KEY_ID) if _id is not None: @@ -1009,9 +1035,40 @@ class QmfQuery(_mapEncoder): return cls(_target=target, _target_params=_target_params, _id=ident) create_id = classmethod(_create_id) + def _create_id_object(cls, object_id, _schema_id=None): + """ + Create a ID Query for an object (schema optional). + """ + if _schema_id is not None: + if not isinstance(_schema_id, SchemaClassId): + raise TypeError("class SchemaClassId expected") + params = {QmfData.KEY_SCHEMA_ID: _schema_id} + else: + params = None + return cls(_target=QmfQuery.TARGET_OBJECT, + _id=object_id, + _target_params=params) + create_id_object = classmethod(_create_id_object) + + def _create_id_object_id(cls, object_id, _schema_id=None): + """ + Create a ID Query for object_ids (schema optional). + """ + if _schema_id is not None: + if not isinstance(_schema_id, SchemaClassId): + raise TypeError("class SchemaClassId expected") + params = {QmfData.KEY_SCHEMA_ID: _schema_id} + else: + params = None + return cls(_target=QmfQuery.TARGET_OBJECT_ID, + _id=object_id, + _target_params=params) + create_id_object_id = classmethod(_create_id_object_id) + def _from_map(cls, map_): return cls(_map=map_) from_map = classmethod(_from_map) + # end constructors def get_target(self): return self._target @@ -1055,7 +1112,18 @@ class QmfQuery(_mapEncoder): return True def map_encode(self): - _map = {self.KEY_TARGET: {self._target: self._target_params}} + t_params = {} + if self._target_params: + for name,value in self._target_params.iteritems(): + if isinstance(value, _mapEncoder): + t_params[name] = value.map_encode() + else: + t_params[name] = value + if t_params: + _map = {self.KEY_TARGET: {self._target: t_params}} + else: + _map = {self.KEY_TARGET: {self._target: None}} + if self._id is not None: if isinstance(self._id, _mapEncoder): _map[self.KEY_ID] = self._id.map_encode() diff --git a/python/qmf2/console.py b/python/qmf2/console.py index 84209e255c..e08abc007c 100644 --- a/python/qmf2/console.py +++ b/python/qmf2/console.py @@ -31,9 +31,9 @@ from threading import Condition from qpid.messaging import Connection, Message, Empty, SendError from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier, - MsgKey, QmfData, QmfAddress, - SchemaClass, SchemaClassId, SchemaEventClass, - SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent) + MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId, + SchemaEventClass, SchemaObjectClass, WorkItem, + SchemaMethod, QmfEvent, timedelta_to_secs) # global flag that indicates which thread (if any) is @@ -249,11 +249,9 @@ class QmfConsoleData(QmfData): if _timeout is None: _timeout = self._agent._console._reply_timeout - # create query to agent using this objects ID - oid = self.get_object_id() - query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, - self.get_object_id()) + query = QmfQuery.create_id_object(self.get_object_id(), + self.get_schema_class_id()) obj_list = self._agent._console.do_query(self._agent, query, timeout=_timeout) if obj_list is None or len(obj_list) != 1: @@ -309,6 +307,10 @@ class QmfConsoleData(QmfData): _map = {self.KEY_OBJECT_ID:str(oid), SchemaMethod.KEY_NAME:name} + + sid = self.get_schema_class_id() + if sid: + _map[self.KEY_SCHEMA_ID] = sid.map_encode() if _in_args: _map[SchemaMethod.KEY_ARGUMENTS] = _in_args @@ -969,7 +971,7 @@ class Console(Thread): # to expire now = datetime.datetime.utcnow() if self._next_agent_expire > now: - timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds + timeout = timedelta_to_secs(self._next_agent_expire - now) try: logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) xxx = self._session.next_receiver(timeout = timeout) @@ -980,34 +982,20 @@ class Console(Thread): logging.debug("Shutting down Console thread") def get_objects(self, + _object_id=None, _schema_id=None, _pname=None, _cname=None, - _object_id=None, _agents=None, _timeout=None): """ - @todo - """ - if _object_id is not None: - # query by object id - query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, _object_id) - elif _schema_id is not None: - pred = [QmfQuery.EQ, QmfData.KEY_SCHEMA_ID, _schema_id.map_encode()] - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred) - elif _pname is not None: - # query by package name (and maybe class name) - if _cname is not None: - pred = [QmfQuery.AND, [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, - [QmfQuery.QUOTE, _pname]], - [QmfQuery.EQ, SchemaClassId.KEY_CLASS, - [QmfQuery.QUOTE, _cname]]] - else: - pred = [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, - [QmfQuery.QUOTE, _pname]] - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred) - else: - raise Exception("invalid arguments") + Retrieve objects by id or schema. + By object_id: must specify schema_id or pname & cname if object defined + by a schema. Undescribed objects: only object_id needed. + + By schema: must specify schema_id or pname & cname - all instances of + objects defined by that schema are returned. + """ if _agents is None: # use copy of current agent list self._lock.acquire() @@ -1021,12 +1009,12 @@ class Console(Thread): agent_list = _agents # @todo validate this list! - # @todo: fix when async do_query done - query all agents at once, then - # wait for replies, instead of per-agent querying.... - if _timeout is None: _timeout = self._reply_timeout + # @todo: fix when async do_query done - query all agents at once, then + # wait for replies, instead of per-agent querying.... + obj_list = [] expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout) for agent in agent_list: @@ -1035,11 +1023,54 @@ class Console(Thread): now = datetime.datetime.utcnow() if now >= expired: break - timeout = ((expired - now) + datetime.timedelta(microseconds=999999)).seconds - reply = self.do_query(agent, query, timeout) - if reply: - obj_list = obj_list + reply + if _pname is None: + if _object_id: + query = QmfQuery.create_id_object(_object_id, + _schema_id) + else: + if _schema_id is not None: + t_params = {QmfData.KEY_SCHEMA_ID: _schema_id} + else: + t_params = None + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, + t_params) + timeout = timedelta_to_secs(expired - now) + reply = self.do_query(agent, query, timeout) + if reply: + obj_list = obj_list + reply + else: + # looking up by package name (and maybe class name), need to + # find all schema_ids in that package, then lookup object by + # schema_id + if _cname is not None: + pred = [QmfQuery.AND, + [QmfQuery.EQ, + SchemaClassId.KEY_PACKAGE, + [QmfQuery.QUOTE, _pname]], + [QmfQuery.EQ, SchemaClassId.KEY_CLASS, + [QmfQuery.QUOTE, _cname]]] + else: + pred = [QmfQuery.EQ, + SchemaClassId.KEY_PACKAGE, + [QmfQuery.QUOTE, _pname]] + query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred) + timeout = timedelta_to_secs(expired - now) + sid_list = self.do_query(agent, query, timeout) + if sid_list: + for sid in sid_list: + now = datetime.datetime.utcnow() + if now >= expired: + break + if _object_id is not None: + query = QmfQuery.create_id_object(_object_id, sid) + else: + t_params = {QmfData.KEY_SCHEMA_ID: sid} + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) + timeout = timedelta_to_secs(expired - now) + reply = self.do_query(agent, query, timeout) + if reply: + obj_list = obj_list + reply if obj_list: return obj_list return None @@ -1103,55 +1134,44 @@ class Console(Thread): " '%s'" % msg) return - ignore = True - matched = False correlated = False - agent_query = self._agent_discovery_filter - if msg.correlation_id: correlated = self._req_correlation.is_valid(msg.correlation_id) - if direct and correlated: - ignore = False - elif agent_query: - matched = agent_query.evaluate(QmfData.create(values=ai_map)) - ignore = not matched + agent = None + self._lock.acquire() + try: + agent = self._agent_map.get(name) + if agent: + # agent already known, just update timestamp + agent._announce_timestamp = datetime.datetime.utcnow() + finally: + self._lock.release() - if not ignore: - agent = None - self._lock.acquire() - try: - agent = self._agent_map.get(name) - finally: - self._lock.release() + if not agent: + # need to create and add a new agent? + matched = False + if self._agent_discovery_filter: + tmp = QmfData.create(values=ai_map) + matched = self._agent_discovery_filter.evaluate(tmp) - if not agent: - # need to create and add a new agent + if (correlated or matched): agent = self._create_agent(name) if not agent: return # failed to add agent - - # lock out expiration scanning code - self._lock.acquire() - try: - old_timestamp = agent._announce_timestamp agent._announce_timestamp = datetime.datetime.utcnow() - finally: - self._lock.release() - - if old_timestamp == None and matched: - logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) - wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) - self._work_q.put(wi) - self._work_q_put = True - - if correlated: - # wake up all waiters - logging.debug("waking waiters for correlation id %s" % msg.correlation_id) - self._req_correlation.put_data(msg.correlation_id, msg) - - + if matched: + # unsolicited, but newly discovered + logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) + wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) + self._work_q.put(wi) + self._work_q_put = True + + if correlated: + # wake up all waiters + logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + self._req_correlation.put_data(msg.correlation_id, msg) def _handle_data_ind_msg(self, msg, cmap, version, direct): """ diff --git a/python/qmf2/tests/basic_method.py b/python/qmf2/tests/basic_method.py index a1847efee9..745fa5d83c 100644 --- a/python/qmf2/tests/basic_method.py +++ b/python/qmf2/tests/basic_method.py @@ -148,7 +148,8 @@ class _agentApp(Thread): raise Exception("Unexpected method call parameters") if mc.get_name() == "set_meth": - obj = self.agent.get_object(mc.get_object_id()) + obj = self.agent.get_object(mc.get_object_id(), + mc.get_schema_id()) if obj is None: error_info = QmfData.create({"code": -2, "description": @@ -164,7 +165,8 @@ class _agentApp(Thread): self.agent.method_response(wi.get_handle(), {"code" : 0}) elif mc.get_name() == "a_method": - obj = self.agent.get_object(mc.get_object_id()) + obj = self.agent.get_object(mc.get_object_id(), + mc.get_schema_id()) if obj is None: error_info = QmfData.create({"code": -3, "description": @@ -246,38 +248,82 @@ class BaseTest(unittest.TestCase): agent = self.console.find_agent(aname, timeout=3) self.assertTrue(agent and agent.get_name() == aname) - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, - [QmfQuery.AND, - [QmfQuery.EXISTS, [QmfQuery.QUOTE, SchemaClassId.KEY_PACKAGE]], - [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, [QmfQuery.QUOTE, "MyPackage"]]]) + query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) + + sid_list = self.console.do_query(agent, query) + self.assertTrue(sid_list and len(sid_list) == 1) + for sid in sid_list: + t_params = {QmfData.KEY_SCHEMA_ID: sid} + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, + _target_params=t_params) + obj_list = self.console.do_query(agent, query) + self.assertTrue(len(obj_list) == 2) + for obj in obj_list: + mr = obj.invoke_method( "set_meth", {"arg_int": -99, + "arg_str": "Now set!"}, + _timeout=3) + self.assertTrue(isinstance(mr, qmf2.console.MethodResult)) + self.assertTrue(mr.succeeded()) + self.assertTrue(mr.get_argument("code") == 0) + + self.assertTrue(obj.get_value("method_call_count") == 0) + self.assertTrue(obj.get_value("set_string") == "UNSET") + self.assertTrue(obj.get_value("set_int") == 0) + + obj.refresh() + + self.assertTrue(obj.get_value("method_call_count") == 1) + self.assertTrue(obj.get_value("set_string") == "Now set!") + self.assertTrue(obj.get_value("set_int") == -99) - obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 2) - for obj in obj_list: - mr = obj.invoke_method( "set_meth", {"arg_int": -99, - "arg_str": "Now set!"}, - _timeout=3) - self.assertTrue(isinstance(mr, qmf2.console.MethodResult)) - self.assertTrue(mr.succeeded()) - self.assertTrue(mr.get_argument("code") == 0) + self.console.destroy(10) - self.assertTrue(obj.get_value("method_call_count") == 0) - self.assertTrue(obj.get_value("set_string") == "UNSET") - self.assertTrue(obj.get_value("set_int") == 0) - obj.refresh() + def test_bad_method_schema(self): + # create console + # find agents + # synchronous query for all objects with schema + # invalid method call on each object + # - should throw a ValueError + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) - self.assertTrue(obj.get_value("method_call_count") == 1) - self.assertTrue(obj.get_value("set_string") == "Now set!") - self.assertTrue(obj.get_value("set_int") == -99) + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) + + sid_list = self.console.do_query(agent, query) + self.assertTrue(sid_list and len(sid_list) == 1) + for sid in sid_list: + + t_params = {QmfData.KEY_SCHEMA_ID: sid} + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, + [QmfQuery.TRUE], + _target_params=t_params) + + obj_list = self.console.do_query(agent, query) + self.assertTrue(len(obj_list) == 2) + for obj in obj_list: + self.failUnlessRaises(ValueError, + obj.invoke_method, + "unknown_meth", + {"arg1": -99, "arg2": "Now set!"}, + _timeout=3) self.console.destroy(10) - - def test_bad_method(self): + def test_bad_method_no_schema(self): # create console # find agents - # synchronous query for all objects in schema + # synchronous query for all objects with no schema # invalid method call on each object # - should throw a ValueError self.notifier = _testNotifier() @@ -294,21 +340,20 @@ class BaseTest(unittest.TestCase): agent = self.console.find_agent(aname, timeout=3) self.assertTrue(agent and agent.get_name() == aname) - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, - [QmfQuery.AND, - [QmfQuery.EXISTS, [QmfQuery.QUOTE, SchemaClassId.KEY_PACKAGE]], - [QmfQuery.EQ, [QmfQuery.UNQUOTE, SchemaClassId.KEY_PACKAGE], [QmfQuery.QUOTE, "MyPackage"]]]) + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT) obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 2) + self.assertTrue(len(obj_list) == 1) for obj in obj_list: - self.failUnlessRaises(ValueError, - obj.invoke_method, - "unknown_meth", - {"arg1": -99, "arg2": "Now set!"}, - _timeout=3) - self.console.destroy(10) + self.assertTrue(obj.get_schema_class_id() == None) + mr = obj.invoke_method("unknown_meth", + {"arg1": -99, "arg2": "Now set!"}, + _timeout=3) + self.assertTrue(isinstance(mr, qmf2.console.MethodResult)) + self.assertFalse(mr.succeeded()) + self.assertTrue(isinstance(mr.get_exception(), QmfData)) + self.console.destroy(10) def test_managed_obj(self): # create console diff --git a/python/qmf2/tests/basic_query.py b/python/qmf2/tests/basic_query.py index 5a1f4c55c8..0f45348d9f 100644 --- a/python/qmf2/tests/basic_query.py +++ b/python/qmf2/tests/basic_query.py @@ -94,13 +94,22 @@ class _agentApp(Thread): self.agent.add_object( _obj1 ) self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, - _values={"index1":99, + _values={"index1":99, "index2": "another name", "set_string": "UNSET", "set_int": 0, "query_count": 0, "method_call_count": 0} )) + self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, + _values={"index1":50, + "index2": "my name", + "set_string": "SET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) + + # add an "unstructured" object to the Agent _obj2 = QmfAgentData(self.agent, _object_id="01545") _obj2.set_value("field1", "a value") @@ -110,6 +119,30 @@ class _agentApp(Thread): _obj2.set_value("index1", 50) self.agent.add_object(_obj2) + _obj2 = QmfAgentData(self.agent, _object_id="01546") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 3) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + _obj2.set_value("index1", 51) + self.agent.add_object(_obj2) + + _obj2 = QmfAgentData(self.agent, _object_id="01544") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 4) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + _obj2.set_value("index1", 49) + self.agent.add_object(_obj2) + + _obj2 = QmfAgentData(self.agent, _object_id="01543") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 4) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + _obj2.set_value("index1", 48) + self.agent.add_object(_obj2) + self.running = False self.ready = Event() @@ -178,7 +211,8 @@ class BaseTest(unittest.TestCase): def test_all_oids(self): # create console # find agents - # synchronous query for all objects by id + # synchronous query for all schemas + # synchronous query for all objects per schema # verify known object ids are returned self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, @@ -194,15 +228,39 @@ class BaseTest(unittest.TestCase): agent = self.console.find_agent(aname, timeout=3) self.assertTrue(agent and agent.get_name() == aname) + # first, find objects per schema + query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) + sid_list = self.console.do_query(agent, query) + self.assertTrue(sid_list and len(sid_list) == 1) + for sid in sid_list: + t_params = {QmfData.KEY_SCHEMA_ID: sid} + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID, + _target_params=t_params) + + oid_list = self.console.do_query(agent, query) + + self.assertTrue(isinstance(oid_list, type([])), + "Unexpected return type") + self.assertTrue(len(oid_list) == 3, "Wrong count") + self.assertTrue('100a name' in oid_list) + self.assertTrue('99another name' in oid_list) + self.assertTrue('50my name' in oid_list) + self.assertTrue('01545' not in oid_list) + + + # now, find all unmanaged objects (no schema) query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID) oid_list = self.console.do_query(agent, query) self.assertTrue(isinstance(oid_list, type([])), "Unexpected return type") - self.assertTrue(len(oid_list) == 3, "Wrong count") - self.assertTrue('100a name' in oid_list) - self.assertTrue('99another name' in oid_list) + self.assertTrue(len(oid_list) == 4, "Wrong count") + self.assertTrue('100a name' not in oid_list) + self.assertTrue('99another name' not in oid_list) self.assertTrue('01545' in oid_list) + self.assertTrue('01544' in oid_list) + self.assertTrue('01543' in oid_list) + self.assertTrue('01546' in oid_list) self.console.destroy(10) @@ -226,8 +284,13 @@ class BaseTest(unittest.TestCase): agent = self.console.find_agent(aname, timeout=3) self.assertTrue(agent and agent.get_name() == aname) - for oid in ['100a name', '99another name', '01545']: - query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, oid) + # first, find objects per schema + query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) + sid_list = self.console.do_query(agent, query) + self.assertTrue(sid_list and len(sid_list) == 1) + + for oid in ['100a name', '99another name']: + query = QmfQuery.create_id_object(oid, sid_list[0]) obj_list = self.console.do_query(agent, query) self.assertTrue(isinstance(obj_list, type([])), @@ -236,15 +299,23 @@ class BaseTest(unittest.TestCase): obj = obj_list[0] self.assertTrue(isinstance(obj, QmfData)) self.assertTrue(obj.get_object_id() == oid) + self.assertTrue(obj.get_schema_class_id() == sid_list[0]) + schema_id = obj.get_schema_class_id() + self.assertTrue(isinstance(schema_id, SchemaClassId)) + self.assertTrue(obj.is_described()) + + # now find schema-less objects + for oid in ['01545']: + query = QmfQuery.create_id_object(oid) + obj_list = self.console.do_query(agent, query) - if obj.is_described(): - self.assertTrue(oid in ['100a name', '99another name']) - schema_id = obj.get_schema_class_id() - self.assertTrue(isinstance(schema_id, SchemaClassId)) - else: - self.assertTrue(oid == "01545") - - + self.assertTrue(isinstance(obj_list, type([])), + "Unexpected return type") + self.assertTrue(len(obj_list) == 1) + obj = obj_list[0] + self.assertTrue(isinstance(obj, QmfData)) + self.assertTrue(obj.get_object_id() == oid) + self.assertFalse(obj.is_described()) self.console.destroy(10) @@ -360,11 +431,21 @@ class BaseTest(unittest.TestCase): agent = self.console.find_agent(aname, timeout=3) self.assertTrue(agent and agent.get_name() == aname) + # get the schema id for MyPackage:MyClass schema + query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, + [QmfQuery.AND, + [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, + [QmfQuery.QUOTE, "MyPackage"]], + [QmfQuery.EQ, SchemaClassId.KEY_CLASS, + [QmfQuery.QUOTE, "MyClass"]]]) + sid_list = self.console.do_query(agent, query) + self.assertTrue(len(sid_list) == 1) + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, [QmfQuery.AND, [QmfQuery.EXISTS, [QmfQuery.QUOTE, "set_string"]], - [QmfQuery.EQ, "set_string", [QmfQuery.QUOTE, "UNSET"]]]) - + [QmfQuery.EQ, "set_string", [QmfQuery.QUOTE, "UNSET"]]], + _target_params={QmfData.KEY_SCHEMA_ID: sid_list[0]}) obj_list = self.console.do_query(agent, query) self.assertTrue(len(obj_list) == 2) for obj in obj_list: @@ -394,41 +475,43 @@ class BaseTest(unittest.TestCase): agent = self.console.find_agent(aname, timeout=3) self.assertTrue(agent and agent.get_name() == aname) - # == 99 + # Query the unmanaged (no schema) objects + + # == 50 query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, [QmfQuery.AND, [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]], - [QmfQuery.EQ, "index1", 99]]) + [QmfQuery.EQ, "index1", 50]]) obj_list = self.console.do_query(agent, query) self.assertTrue(len(obj_list) == 1) self.assertTrue(obj_list[0].has_value("index1")) - self.assertTrue(obj_list[0].get_value("index1") == 99) + self.assertTrue(obj_list[0].get_value("index1") == 50) - # <= 99 + # <= 50 query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, [QmfQuery.AND, [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]], - [QmfQuery.LE, "index1", 99]]) + [QmfQuery.LE, "index1", 50]]) obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 2) + self.assertTrue(len(obj_list) == 3) for obj in obj_list: self.assertTrue(obj.has_value("index1")) - self.assertTrue(obj.get_value("index1") <= 99) + self.assertTrue(obj.get_value("index1") <= 50) - # > 99 + # > 50 query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, [QmfQuery.AND, [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]], - [QmfQuery.GT, "index1", 99]]) + [QmfQuery.GT, "index1", 50]]) obj_list = self.console.do_query(agent, query) self.assertTrue(len(obj_list) == 1) for obj in obj_list: self.assertTrue(obj.has_value("index1")) - self.assertTrue(obj.get_value("index1") > 99) + self.assertTrue(obj.get_value("index1") > 50) self.console.destroy(10) diff --git a/python/qmf2/tests/events.py b/python/qmf2/tests/events.py index a621aa0392..9a96fbd9a4 100644 --- a/python/qmf2/tests/events.py +++ b/python/qmf2/tests/events.py @@ -84,7 +84,6 @@ class _agentApp(Thread): if not self.ready.is_set(): raise Exception("Agent failed to connect to broker.") # time.sleep(1) - print("!!! agent=%s setup complete (%s)" % (self.agent, time.time())) def stop_app(self): self.running = False @@ -106,7 +105,6 @@ class _agentApp(Thread): raise Skipped(e) self.agent.set_connection(conn) - print("!!! agent=%s connection done (%s)" % (self.agent, time.time())) self.ready.set() counter = 1 diff --git a/python/qmf2/tests/obj_gets.py b/python/qmf2/tests/obj_gets.py index 514121980e..43f2da5da2 100644 --- a/python/qmf2/tests/obj_gets.py +++ b/python/qmf2/tests/obj_gets.py @@ -17,6 +17,7 @@ # import unittest import logging +import datetime from threading import Thread, Event import qpid.messaging @@ -193,8 +194,10 @@ class BaseTest(unittest.TestCase): agent = _agentApp("agent-" + str(i), self.broker, 1) agent.start_app() self.agents.append(agent) + #print("!!!! STARTING TEST: %s" % datetime.datetime.utcnow()) def tearDown(self): + #print("!!!! STOPPING TEST: %s" % datetime.datetime.utcnow()) for agent in self.agents: if agent is not None: agent.stop_app() @@ -221,25 +224,34 @@ class BaseTest(unittest.TestCase): self.assertTrue(agent and agent.get_name() == aname) # console has discovered all agents, now query all undesc-2 objects + #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) objs = self.console.get_objects(_object_id="undesc-2", _timeout=5) + #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) self.assertTrue(len(objs) == self.agent_count) for obj in objs: self.assertTrue(obj.get_object_id() == "undesc-2") # now query all objects from schema "package1" + #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) objs = self.console.get_objects(_pname="package1", _timeout=5) + #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) self.assertTrue(len(objs) == (self.agent_count * 3)) for obj in objs: self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") # now query all objects from schema "package2" + #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) objs = self.console.get_objects(_pname="package2", _timeout=5) + #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) self.assertTrue(len(objs) == (self.agent_count * 2)) for obj in objs: self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") # now query all objects from schema "package1/class2" - objs = self.console.get_objects(_pname="package1", _cname="class2", _timeout=5) + #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) + objs = self.console.get_objects(_pname="package1", _cname="class2", + _timeout=5) + #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) self.assertTrue(len(objs) == self.agent_count) for obj in objs: self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") @@ -247,7 +259,9 @@ class BaseTest(unittest.TestCase): # given the schema identifier from the last query, repeat using the # specific schema id + #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) schema_id = objs[0].get_schema_class_id() + #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) objs = self.console.get_objects(_schema_id=schema_id, _timeout=5) self.assertTrue(len(objs) == self.agent_count) for obj in objs: @@ -352,7 +366,7 @@ class BaseTest(unittest.TestCase): self.assertTrue(agent and agent.get_name() == aname) agent_list.append(agent) - # Only use one agetn + # Only use one agent agent = agent_list[0] # console has discovered all agents, now query all undesc-2 objects @@ -400,3 +414,104 @@ class BaseTest(unittest.TestCase): self.console.destroy(10) + + + def test_all_objs_by_oid(self): + # create console + # find all agents + # synchronous query for all described objects by: + # oid & schema_id + # oid & package name + # oid & package and class name + # verify known object ids are returned + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # now query all objects from schema "package1" + #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) + objs = self.console.get_objects(_pname="package1", + _object_id="p1c1_key1", _timeout=5) + #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) + self.assertTrue(len(objs) == self.agent_count) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") + self.assertTrue(obj.get_object_id() == "p1c1_key1") + # mooch the schema for a later test + schema_id_p1c1 = objs[0].get_schema_class_id() + + #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) + objs = self.console.get_objects(_pname="package1", + _object_id="p1c2_name1", _timeout=5) + #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) + self.assertTrue(len(objs) == self.agent_count) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2") + self.assertTrue(obj.get_object_id() == "p1c2_name1") + + #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) + objs = self.console.get_objects(_pname="package2", _cname="class1", + _object_id="p2c1_key1", _timeout=5) + #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) + self.assertTrue(len(objs) == self.agent_count) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") + self.assertTrue(obj.get_object_id() == "p2c1_key1") + + #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) + objs = self.console.get_objects(_schema_id=schema_id_p1c1, + _object_id="p1c1_key2", _timeout=5) + #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) + self.assertTrue(len(objs) == self.agent_count) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") + self.assertTrue(obj.get_object_id() == "p1c1_key2") + + # this should return all "undescribed" objects + #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) + objs = self.console.get_objects(_timeout=5) + #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) + self.assertTrue(len(objs) == (self.agent_count * 2)) + for obj in objs: + self.assertTrue(obj.get_object_id() == "undesc-1" or + obj.get_object_id() == "undesc-2") + + # these should fail + #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) + objs = self.console.get_objects(_schema_id=schema_id_p1c1, + _object_id="does not exist", + _timeout=5) + #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) + self.assertTrue(objs == None) + + #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) + objs = self.console.get_objects(_pname="package2", + _object_id="does not exist", + _timeout=5) + #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) + self.assertTrue(objs == None) + + #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) + objs = self.console.get_objects(_pname="package3", + _object_id="does not exist", + _timeout=5) + #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) + self.assertTrue(objs == None) + + self.console.destroy(10) + |