diff options
-rw-r--r-- | python/qmf2/common.py | 1 | ||||
-rw-r--r-- | python/qmf2/console.py | 439 | ||||
-rw-r--r-- | python/qmf2/tests/__init__.py | 1 | ||||
-rw-r--r-- | python/qmf2/tests/async_query.py | 460 |
4 files changed, 840 insertions, 61 deletions
diff --git a/python/qmf2/common.py b/python/qmf2/common.py index 66cb1f3a7d..10ea994a16 100644 --- a/python/qmf2/common.py +++ b/python/qmf2/common.py @@ -131,6 +131,7 @@ class WorkItem(object): OBJECT_UPDATE=5 EVENT_RECEIVED=7 AGENT_HEARTBEAT=8 + QUERY_COMPLETE=9 # Enumeration of the types of WorkItems produced on the Agent METHOD_CALL=1000 QUERY=1001 diff --git a/python/qmf2/console.py b/python/qmf2/console.py index 441b770d53..7b4bfe9fb8 100644 --- a/python/qmf2/console.py +++ b/python/qmf2/console.py @@ -56,21 +56,47 @@ class _Mailbox(object): """ Virtual base class for all Mailbox-like objects. """ + def __init__(self, console): + self.console = console + self.cid = 0 + self.console._add_mailbox(self) + + def get_address(self): + return self.cid + def deliver(self, data): + """ + Invoked by Console Management thread when a message arrives for + this mailbox. + """ raise Exception("_Mailbox deliver() method must be provided") + def destroy(self): + """ + Release the mailbox. Once called, the mailbox should no longer be + referenced. + """ + self.console._remove_mailbox(self.cid) -class _WaitableMailbox(_Mailbox): + +class _SyncMailbox(_Mailbox): """ A simple mailbox that allows a consumer to wait for delivery of data. """ - def __init__(self): - self._data = [] + def __init__(self, console): + """ + Invoked by application thread. + """ + super(_SyncMailbox, self).__init__(console) self._cv = Condition() + self._data = [] self._waiting = False def deliver(self, data): - """ Drop data into the mailbox, waking any waiters if necessary. """ + """ + Drop data into the mailbox, waking any waiters if necessary. + Invoked by Console Management thread only. + """ self._cv.acquire() try: self._data.append(data) @@ -81,7 +107,10 @@ class _WaitableMailbox(_Mailbox): self._cv.release() def fetch(self, timeout=None): - """ Get one data item from a mailbox, with timeout. """ + """ + Get one data item from a mailbox, with timeout. + Invoked by application thread. + """ self._cv.acquire() try: if len(self._data) == 0: @@ -93,6 +122,189 @@ class _WaitableMailbox(_Mailbox): self._cv.release() +class _AsyncMailbox(_Mailbox): + """ + A Mailbox for asynchronous delivery, with a timeout value. + """ + def __init__(self, console, + agent_name, + _timeout=None): + """ + Invoked by application thread. + """ + super(_AsyncMailbox, self).__init__(console) + + self.agent_name = agent_name + self.console = console + + if _timeout is None: + _timeout = console._reply_timeout + self.expiration_date = (datetime.datetime.utcnow() + + datetime.timedelta(seconds=_timeout)) + console._lock.acquire() + try: + console._async_mboxes[self.cid] = self + finally: + console._lock.release() + + # now that an async mbox has been created, wake the + # console mgmt thread so it will know about the mbox expiration + # date (and adjust its idle sleep period correctly) + + console._wake_thread() + + def deliver(self, msg): + """ + """ + raise Exception("deliver() method must be provided") + + def expire(self): + raise Exception("expire() method must be provided") + + + def destroy(self): + self.console._lock.acquire() + try: + if self.cid in self.console._async_mboxes: + del self.console._async_mboxes[self.cid] + finally: + self.console._lock.release() + super(_AsyncMailbox, self).destroy() + + + +class _QueryMailbox(_AsyncMailbox): + """ + A mailbox used for asynchronous query requests. + """ + def __init__(self, console, + agent_name, + context, + target, msgkey, + _timeout=None): + """ + Invoked by application thread. + """ + super(_QueryMailbox, self).__init__(console, + agent_name, + _timeout) + self.target = target + self.msgkey = msgkey + self.context = context + self.result = [] + + def deliver(self, reply): + """ + Process query response messages delivered to this mailbox. + Invoked by Console Management thread only. + """ + done = False + objects = reply.content.get(self.msgkey) + if not objects: + done = True + else: + # convert from map to native types if needed + if self.target == QmfQuery.TARGET_SCHEMA_ID: + for sid_map in objects: + self.result.append(SchemaClassId.from_map(sid_map)) + + elif self.target == QmfQuery.TARGET_SCHEMA: + for schema_map in objects: + # extract schema id, convert based on schema type + sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + if sid: + if sid.get_type() == SchemaClassId.TYPE_DATA: + schema = SchemaObjectClass.from_map(schema_map) + else: + schema = SchemaEventClass.from_map(schema_map) + self.console._add_schema(schema) # add to schema cache + self.result.append(schema) + + elif self.target == QmfQuery.TARGET_OBJECT: + for obj_map in objects: + # @todo: need the agent name - ideally from the + # reply message iself. + agent = self.console.get_agent(self.agent_name) + if agent: + obj = QmfConsoleData(map_=obj_map, agent=agent) + # start fetch of schema if not known + sid = obj.get_schema_class_id() + if sid: + self.console._prefetch_schema(sid, agent) + self.result.append(obj) + + + else: + # no conversion needed. + self.result += objects + + if done: + # create workitem + # logging.error("QUERY COMPLETE for %s" % str(self.context)) + wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) + self.console._work_q.put(wi) + self.console._work_q_put = True + + self.destroy() + + + def expire(self): + logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" % + datetime.datetime.utcnow()) + # send along whatever (possibly none) has been received so far + wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) + self.console._work_q.put(wi) + self.console._work_q_put = True + + self.destroy() + + + +class _SchemaPrefetchMailbox(_AsyncMailbox): + """ + Handles responses to schema fetches made by the console. + """ + def __init__(self, console, + agent_name, + schema_id, + _timeout=None): + """ + Invoked by application thread. + """ + super(_SchemaPrefetchMailbox, self).__init__(console, + agent_name, + _timeout) + + self.schema_id = schema_id + + + def deliver(self, reply): + """ + Process schema response messages. + """ + done = False + schemas = reply.content.get(MsgKey.schema) + if schemas: + for schema_map in schemas: + # extract schema id, convert based on schema type + sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + if sid: + if sid.get_type() == SchemaClassId.TYPE_DATA: + schema = SchemaObjectClass.from_map(schema_map) + else: + schema = SchemaEventClass.from_map(schema_map) + self.console._add_schema(schema) # add to schema cache + self.destroy() + + + def expire(self): + self.destroy() + + ##============================================================================== ## DATA MODEL @@ -185,8 +397,8 @@ class QmfConsoleData(QmfData): if _timeout is None: _timeout = self._agent._console._reply_timeout - mbox = _WaitableMailbox() - cid = self._agent._console._add_mailbox(mbox) + mbox = _SyncMailbox(self._agent._console) + cid = mbox.get_address() _map = {self.KEY_OBJECT_ID:str(oid), SchemaMethod.KEY_NAME:name} @@ -202,7 +414,7 @@ class QmfConsoleData(QmfData): self._agent._send_method_req(_map, cid) except SendError, e: logging.error(str(e)) - self._agent._console._remove_mailbox(cid) + mbox.destroy() return None # @todo async method calls!!! @@ -211,7 +423,7 @@ class QmfConsoleData(QmfData): logging.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) - self._agent._console._remove_mailbox(cid) + mbox.destroy() if not replyMsg: logging.debug("Agent method req wait timed-out.") @@ -258,7 +470,7 @@ class Agent(object): """ def __init__(self, name, console): """ - @type name: AgentId + @type name: string @param name: uniquely identifies this agent in the AMQP domain. """ @@ -358,8 +570,8 @@ class Agent(object): if _in_args: _in_args = _in_args.copy() - mbox = _WaitableMailbox() - cid = self._console._add_mailbox(mbox) + mbox = _SyncMailbox(self._console) + cid = mbox.get_address() _map = {SchemaMethod.KEY_NAME:name} if _in_args: @@ -370,7 +582,7 @@ class Agent(object): self._send_method_req(_map, cid) except SendError, e: logging.error(str(e)) - self._console._remove_mailbox(cid) + mbox.destroy() return None # @todo async method calls!!! @@ -379,7 +591,7 @@ class Agent(object): logging.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) - self._console._remove_mailbox(cid) + mbox.destroy() if not replyMsg: logging.debug("Agent method req wait timed-out.") @@ -497,19 +709,20 @@ class Console(Thread): self._announce_recvr = None self._locate_sender = None self._schema_cache = {} + self._pending_schema_req = [] self._agent_discovery_filter = None self._reply_timeout = reply_timeout self._agent_timeout = agent_timeout self._next_agent_expire = None - # lock out run() thread - self._cv = Condition() + self._next_mbox_expire = None # for passing WorkItems to the application self._work_q = Queue.Queue() self._work_q_put = False # Correlation ID and mailbox storage self._correlation_id = long(time.time()) # pseudo-randomize self._post_office = {} # indexed by cid - + self._async_mboxes = {} # indexed by cid, used to expire them + ## Old stuff below??? #self._broker_list = [] #self.impl = qmfengine.Console() @@ -612,15 +825,7 @@ class Console(Thread): self._operational = False if self.isAlive(): # kick my thread to wake it up - logging.debug("Sending noop to wake up [%s]" % self._address) - try: - msg = Message(properties={"method":"request", - "qmf.subject":make_subject(OpCode.noop)}, - subject=self._name, - content={"noop":"noop"}) - self._direct_sender.send( msg, sync=True ) - except SendError, e: - logging.error(str(e)) + self._wake_thread() logging.debug("waiting for console receiver thread to exit") self.join(timeout) if self.isAlive(): @@ -651,8 +856,8 @@ class Console(Thread): self._lock.acquire() try: - if agent._id in self._agent_map: - del self._agent_map[agent._id] + if agent._name in self._agent_map: + del self._agent_map[agent._name] finally: self._lock.release() @@ -672,8 +877,8 @@ class Console(Thread): # agent not present yet - ping it with an agent_locate - mbox = _WaitableMailbox() - cid = self._add_mailbox(mbox) + mbox = _SyncMailbox(self) + cid = mbox.get_address() query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) msg = Message(subject="console.ind.locate." + name, @@ -690,7 +895,7 @@ class Console(Thread): self._topic_sender.send(msg) except SendError, e: logging.error(str(e)) - self._remove_mailbox(cid) + mbox.destroy() return None if timeout is None: @@ -699,7 +904,7 @@ class Console(Thread): new_agent = None logging.debug("Waiting for response to Agent Locate (%s)" % timeout) mbox.fetch(timeout) - self._remove_mailbox(cid) + mbox.destroy() logging.debug("Agent Locate wait ended (%s)" % time.time()) self._lock.acquire() try: @@ -749,15 +954,15 @@ class Console(Thread): if not msgkey: raise Exception("Invalid target for query: %s" % str(query)) - mbox = _WaitableMailbox() - cid = self._add_mailbox(mbox) + mbox = _SyncMailbox(self) + cid = mbox.get_address() try: logging.debug("Sending Query to Agent (%s)" % time.time()) agent._send_query(query, cid) except SendError, e: logging.error(str(e)) - self._remove_mailbox(cid) + mbox.destroy() return None if not timeout: @@ -802,33 +1007,74 @@ class Console(Thread): elif target == QmfQuery.TARGET_OBJECT: for obj_map in objects: obj = QmfConsoleData(map_=obj_map, agent=agent) + # start fetch of schema if not known + sid = obj.get_schema_class_id() + if sid: + self._prefetch_schema(sid, agent) response.append(obj) - # @todo prefetch unknown schema - # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) - # if sid_map: - # sid = SchemaClassId.from_map(sid_map) - # # if the object references a schema, fetch it - # # schema = self._fetch_schema(sid, _agent=agent, - # # _timeout=timeout) - # # if not schema: - # # logging.warning("Unknown schema, id=%s" % sid) - # # continue - # obj = QmfConsoleData(map_=obj_map, agent=agent, - # _schema=schema) - # else: - # # no schema needed else: # no conversion needed. response += objects now = datetime.datetime.utcnow() - self._remove_mailbox(cid) + mbox.destroy() return response + def do_async_query(self, agent, query, app_handle, _timeout=None ): + """ + """ + query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info, + QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id, + QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id, + QmfQuery.TARGET_SCHEMA: MsgKey.schema, + QmfQuery.TARGET_OBJECT: MsgKey.data_obj, + QmfQuery.TARGET_AGENT: MsgKey.agent_info} + + target = query.get_target() + msgkey = query_keymap.get(target) + if not msgkey: + raise Exception("Invalid target for query: %s" % str(query)) + + mbox = _QueryMailbox(self, + agent.get_name(), + app_handle, + target, msgkey, + _timeout) + cid = mbox.get_address() + + try: + logging.debug("Sending Query to Agent (%s)" % time.time()) + agent._send_query(query, cid) + except SendError, e: + logging.error(str(e)) + mbox.destroy() + return False + return True + + + def _wake_thread(self): + """ + Make the console management thread loop wakeup from its next_receiver + sleep. + """ + logging.debug("Sending noop to wake up [%s]" % self._address) + msg = Message(properties={"method":"request", + "qmf.subject":make_subject(OpCode.noop)}, + subject=self._name, + content={"noop":"noop"}) + try: + self._direct_sender.send( msg, sync=True ) + except SendError, e: + logging.error(str(e)) + def run(self): + """ + Console Management Thread main loop. + Handles inbound messages, agent discovery, async mailbox timeouts. + """ global _callback_thread self._ready.set() @@ -858,6 +1104,7 @@ class Console(Thread): self._dispatch(msg, _direct=True) self._expire_agents() # check for expired agents + self._expire_mboxes() # check for expired async mailbox requests #if qLen == 0 and self._work_q.qsize() and self._notifier: if self._work_q_put and self._notifier: @@ -869,11 +1116,15 @@ class Console(Thread): _callback_thread = None if self._operational: - # wait for a message to arrive or an agent - # to expire + # wait for a message to arrive, or an agent + # to expire, or a mailbox requrest to time out now = datetime.datetime.utcnow() - if self._next_agent_expire > now: - timeout = timedelta_to_secs(self._next_agent_expire - now) + next_expire = self._next_agent_expire + if (self._next_mbox_expire and + self._next_mbox_expire < next_expire): + next_expire = self._next_mbox_expire + if next_expire > now: + timeout = timedelta_to_secs(next_expire - now) try: logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) xxx = self._session.next_receiver(timeout = timeout) @@ -1089,7 +1340,8 @@ class Console(Thread): return # wake up all waiters - logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + logging.debug("waking waiters for correlation id %s" % + msg.correlation_id) mbox.deliver(msg) @@ -1151,6 +1403,36 @@ class Console(Thread): self._work_q_put = True + def _expire_mboxes(self): + """ + Check all async mailboxes for outstanding requests that have expired. + """ + now = datetime.datetime.utcnow() + if self._next_mbox_expire and now < self._next_mbox_expire: + return + expired_mboxes = [] + self._next_mbox_expire = None + self._lock.acquire() + try: + for mbox in self._async_mboxes.itervalues(): + if now >= mbox.expiration_date: + expired_mboxes.append(mbox) + else: + if (self._next_mbox_expire is None or + mbox.expiration_date < self._next_mbox_expire): + self._next_mbox_expire = mbox.expiration_date + + for mbox in expired_mboxes: + del self._async_mboxes[mbox.cid] + finally: + self._lock.release() + + for mbox in expired_mboxes: + # note: expire() may deallocate the mbox, so don't touch + # it further. + mbox.expire() + + def _expire_agents(self): """ Check for expired agents and issue notifications when they expire. @@ -1288,9 +1570,43 @@ class Console(Thread): sid = schema.get_class_id() if not self._schema_cache.has_key(sid): self._schema_cache[sid] = schema + if sid in self._pending_schema_req: + self._pending_schema_req.remove(sid) + finally: + self._lock.release() + + def _prefetch_schema(self, schema_id, agent): + """ + Send an async request for the schema identified by schema_id if the + schema is not available in the cache. + """ + need_fetch = False + self._lock.acquire() + try: + if ((not self._schema_cache.has_key(schema_id)) and + schema_id not in self._pending_schema_req): + self._pending_schema_req.append(schema_id) + need_fetch = True finally: self._lock.release() + if need_fetch: + mbox = _SchemaPrefetchMailbox(self, agent.get_name(), + schema_id) + query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id) + logging.debug("Sending Schema Query to Agent (%s)" % time.time()) + try: + agent._send_query(query, mbox.get_address()) + except SendError, e: + logging.error(str(e)) + mbox.destroy() + self._lock.acquire() + try: + self._pending_schema_req.remove(schema_id) + finally: + self._lock.release() + + def _fetch_schema(self, schema_id, _agent=None, _timeout=None): """ Find the schema identified by schema_id. If not in the cache, ask the @@ -1320,16 +1636,16 @@ class Console(Thread): return None def _add_mailbox(self, mbox): - """ Add a mailbox to the post office, return a unique identifier """ - cid = 0 + """ + Add a mailbox to the post office, and assign it a unique address. + """ self._lock.acquire() try: - cid = self._correlation_id + mbox.cid = self._correlation_id self._correlation_id += 1 - self._post_office[cid] = mbox + self._post_office[mbox.cid] = mbox finally: self._lock.release() - return cid def _get_mailbox(self, mid): try: @@ -1355,7 +1671,8 @@ class Console(Thread): self._lock.acquire() try: - del self._post_office[mid] + if mid in self._post_office: + del self._post_office[mid] finally: self._lock.release() diff --git a/python/qmf2/tests/__init__.py b/python/qmf2/tests/__init__.py index 16acf50d79..9fabdb9ef5 100644 --- a/python/qmf2/tests/__init__.py +++ b/python/qmf2/tests/__init__.py @@ -25,3 +25,4 @@ import basic_method import obj_gets import events import multi_response +import async_query diff --git a/python/qmf2/tests/async_query.py b/python/qmf2/tests/async_query.py new file mode 100644 index 0000000000..f598939d46 --- /dev/null +++ b/python/qmf2/tests/async_query.py @@ -0,0 +1,460 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +from threading import Thread, Event + +import qpid.messaging +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, WorkItem) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, broker_url, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.broker_url = broker_url + self.agent = Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + + # Dynamically construct a management database + + _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"] ) + # add properties + _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # These two properties can be set via the method call + _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # add method + _meth = SchemaMethod( _desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + self.agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + _obj1 = QmfAgentData( self.agent, _schema=_schema, + _values={"index1":100, "index2":"a name"}) + _obj1.set_value("set_string", "UNSET") + _obj1.set_value("set_int", 0) + _obj1.set_value("query_count", 0) + _obj1.set_value("method_call_count", 0) + self.agent.add_object( _obj1 ) + + self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) + + self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, + _values={"index1":50, + "index2": "my name", + "set_string": "SET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) + + + # add an "unstructured" object to the Agent + _obj2 = QmfAgentData(self.agent, _object_id="01545") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 2) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + _obj2.set_value("index1", 50) + self.agent.add_object(_obj2) + + _obj2 = QmfAgentData(self.agent, _object_id="01546") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 3) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + _obj2.set_value("index1", 51) + self.agent.add_object(_obj2) + + _obj2 = QmfAgentData(self.agent, _object_id="01544") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 4) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + _obj2.set_value("index1", 49) + self.agent.add_object(_obj2) + + _obj2 = QmfAgentData(self.agent, _object_id="01543") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 4) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + _obj2.set_value("index1", 48) + self.agent.add_object(_obj2) + + self.running = False + self.ready = Event() + + def start_app(self): + self.running = True + self.start() + self.ready.wait(10) + if not self.ready.is_set(): + raise Exception("Agent failed to connect to broker.") + + def stop_app(self): + self.running = False + # wake main thread + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(self.broker_url.host, + self.broker_url.port, + self.broker_url.user, + self.broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + self.ready.set() + + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + if self.conn: + self.agent.remove_connection(10) + self.agent.destroy(10) + + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent_heartbeat = 1 + self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) + self.agent1.start_app() + self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) + self.agent2.start_app() + + def tearDown(self): + if self.agent1: + self.agent1.stop_app() + self.agent1 = None + if self.agent2: + self.agent2.stop_app() + self.agent2 = None + + def test_all_schema_ids(self): + # create console + # find agents + # asynchronous query for all schema ids + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # send queries + query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) + rc = self.console.do_async_query(agent, query, aname) + self.assertTrue(rc) + + # done. Now wait for async responses + + count = 0 + while self.notifier.wait_for_work(3): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + count += 1 + self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE) + self.assertTrue(wi.get_handle() == "agent1" or + wi.get_handle() == "agent2") + reply = wi.get_params() + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], SchemaClassId)) + self.assertTrue(reply[0].get_package_name() == "MyPackage") + self.assertTrue(reply[0].get_class_name() == "MyClass") + self.console.release_workitem(wi) + wi = self.console.get_next_workitem(timeout=0) + + self.assertTrue(count == 2) + self.console.destroy(10) + + + + def test_undescribed_objs(self): + # create console + # find agents + # asynchronous query for all non-schema objects + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # send queries + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT) + rc = self.console.do_async_query(agent, query, aname) + self.assertTrue(rc) + + # done. Now wait for async responses + + count = 0 + while self.notifier.wait_for_work(3): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + count += 1 + self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE) + self.assertTrue(wi.get_handle() == "agent1" or + wi.get_handle() == "agent2") + reply = wi.get_params() + self.assertTrue(len(reply) == 4) + self.assertTrue(isinstance(reply[0], qmf2.console.QmfConsoleData)) + self.assertFalse(reply[0].is_described()) # no schema + self.console.release_workitem(wi) + wi = self.console.get_next_workitem(timeout=0) + + self.assertTrue(count == 2) + self.console.destroy(10) + + + + def test_described_objs(self): + # create console + # find agents + # asynchronous query for all schema-based objects + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # + t_params = {QmfData.KEY_SCHEMA_ID: SchemaClassId("MyPackage", "MyClass")} + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) + # + rc = self.console.do_async_query(agent, query, aname) + self.assertTrue(rc) + + # done. Now wait for async responses + + count = 0 + while self.notifier.wait_for_work(3): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + count += 1 + self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE) + self.assertTrue(wi.get_handle() == "agent1" or + wi.get_handle() == "agent2") + reply = wi.get_params() + self.assertTrue(len(reply) == 3) + self.assertTrue(isinstance(reply[0], qmf2.console.QmfConsoleData)) + self.assertTrue(reply[0].is_described()) # has schema + self.console.release_workitem(wi) + wi = self.console.get_next_workitem(timeout=0) + + self.assertTrue(count == 2) + # @todo test if the console has learned the corresponding schemas.... + self.console.destroy(10) + + + + def test_all_schemas(self): + # create console + # find agents + # asynchronous query for all schemas + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + # test internal state using non-api calls: + # no schemas present yet + self.assertTrue(len(self.console._schema_cache) == 0) + # end test + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # send queries + query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA) + rc = self.console.do_async_query(agent, query, aname) + self.assertTrue(rc) + + # done. Now wait for async responses + + count = 0 + while self.notifier.wait_for_work(3): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + count += 1 + self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE) + self.assertTrue(wi.get_handle() == "agent1" or + wi.get_handle() == "agent2") + reply = wi.get_params() + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], qmf2.common.SchemaObjectClass)) + self.assertTrue(reply[0].get_class_id().get_package_name() == "MyPackage") + self.assertTrue(reply[0].get_class_id().get_class_name() == "MyClass") + self.console.release_workitem(wi) + wi = self.console.get_next_workitem(timeout=0) + + self.assertTrue(count == 2) + + # test internal state using non-api calls: + # schema has been learned + self.assertTrue(len(self.console._schema_cache) == 1) + # end test + + self.console.destroy(10) + + + + def test_query_expiration(self): + # create console + # find agents + # kill the agents + # send async query + # wait for & verify expiration + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=30) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + # find the agents + agents = [] + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + agents.append(agent) + + # now nuke the agents from orbit. It's the only way to be sure. + + self.agent1.stop_app() + self.agent1 = None + self.agent2.stop_app() + self.agent2 = None + + # now send queries to agents that no longer exist + for agent in agents: + query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA) + rc = self.console.do_async_query(agent, query, agent.get_name(), + _timeout=2) + self.assertTrue(rc) + + # done. Now wait for async responses due to timeouts + + count = 0 + while self.notifier.wait_for_work(3): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + count += 1 + self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE) + self.assertTrue(wi.get_handle() == "agent1" or + wi.get_handle() == "agent2") + reply = wi.get_params() + self.assertTrue(len(reply) == 0) # empty + + self.console.release_workitem(wi) + wi = self.console.get_next_workitem(timeout=0) + + self.assertTrue(count == 2) + self.console.destroy(10) |