diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-04 19:38:55 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-04 19:38:55 +0000 |
commit | ab53c7c55069282cb6ec1e38c17b46112c9d21f1 (patch) | |
tree | f1bd78327102a1897d2e9b1ffc781e8f40424c26 | |
parent | 62b6eaea83d81155695d19dc716ad97094e89e54 (diff) | |
download | qpid-python-ab53c7c55069282cb6ec1e38c17b46112c9d21f1.tar.gz |
QPID-2261: add multi-msg query response support. Fix mailbox code to allow mult-msg per correlation id.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@906615 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | python/qmf2/agent.py | 76 | ||||
-rw-r--r-- | python/qmf2/common.py | 22 | ||||
-rw-r--r-- | python/qmf2/console.py | 384 | ||||
-rw-r--r-- | python/qmf2/tests/__init__.py | 1 | ||||
-rw-r--r-- | python/qmf2/tests/multi_response.py | 295 |
5 files changed, 546 insertions, 232 deletions
diff --git a/python/qmf2/agent.py b/python/qmf2/agent.py index 790cec283c..a6b3c39ad1 100644 --- a/python/qmf2/agent.py +++ b/python/qmf2/agent.py @@ -95,6 +95,8 @@ class Agent(Thread): self._address = QmfAddress.direct(self.name, self._domain) self._notifier = _notifier self._heartbeat_interval = _heartbeat_interval + # @todo: currently, max # of objects in a single reply message, would + # be better if it were max bytesize of per-msg content... self._max_msg_size = _max_msg_size self._capacity = _capacity @@ -456,6 +458,38 @@ class Agent(Thread): except SendError, e: logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e))) + def _send_query_response(self, subject, msgkey, cid, reply_to, objects): + """ + Send a response to a query, breaking the result into multiple + messages based on the agent's _max_msg_size config parameter + """ + + total = len(objects) + if self._max_msg_size: + max_count = self._max_msg_size + else: + max_count = total + + start = 0 + end = min(total, max_count) + while end <= total: + m = Message(properties={"qmf.subject":subject, + "method":"response"}, + correlation_id = cid, + content={msgkey:objects[start:end]}) + self._send_reply(m, reply_to) + if end == total: + break; + start = end + end = min(total, end + max_count) + + # response terminator - last message has empty object array + if total: + m = Message(properties={"qmf.subject":subject, + "method":"response"}, + correlation_id = cid, + content={msgkey: []} ) + self._send_reply(m, reply_to) def _dispatch(self, msg, _direct=False): """ @@ -615,12 +649,11 @@ class Agent(Thread): finally: self._lock.release() - m = Message(properties={"qmf.subject":make_subject(OpCode.data_ind), - "method":"response"}, - content={MsgKey.package_info: pnames} ) - if msg.correlation_id != None: - m.correlation_id = msg.correlation_id - self._send_reply(m, msg.reply_to) + self._send_query_response(make_subject(OpCode.data_ind), + MsgKey.package_info, + msg.correlation_id, + msg.reply_to, + pnames) def _querySchema( self, msg, query, _idOnly=False ): """ @@ -652,17 +685,15 @@ class Agent(Thread): self._lock.release() if _idOnly: - content = {MsgKey.schema_id: schemas} + msgkey = MsgKey.schema_id else: - content = {MsgKey.schema:schemas} - - m = Message(properties={"method":"response", - "qmf.subject":make_subject(OpCode.data_ind)}, - content=content ) - if msg.correlation_id != None: - m.correlation_id = msg.correlation_id + msgkey = MsgKey.schema - self._send_reply(m, msg.reply_to) + self._send_query_response(make_subject(OpCode.data_ind), + msgkey, + msg.correlation_id, + msg.reply_to, + schemas) def _queryData( self, msg, query, _idOnly=False ): @@ -736,17 +767,16 @@ class Agent(Thread): self._lock.release() if _idOnly: - content = {MsgKey.object_id:data_objs} + msgkey = MsgKey.object_id else: - content = {MsgKey.data_obj:data_objs} + msgkey = MsgKey.data_obj - m = Message(properties={"method":"response", - "qmf.subject":make_subject(OpCode.data_ind)}, - content=content ) - if msg.correlation_id != None: - m.correlation_id = msg.correlation_id + self._send_query_response(make_subject(OpCode.data_ind), + msgkey, + msg.correlation_id, + msg.reply_to, + data_objs) - self._send_reply(m, msg.reply_to) ##============================================================================== diff --git a/python/qmf2/common.py b/python/qmf2/common.py index 280cee3576..66cb1f3a7d 100644 --- a/python/qmf2/common.py +++ b/python/qmf2/common.py @@ -969,6 +969,28 @@ class QmfQuery(_mapEncoder): return cls(_target=target, _target_params=_target_params) create_wildcard = classmethod(_create_wildcard) + def _create_wildcard_object_id(cls, schema_id): + """ + Create a wildcard to match all object_ids for a given schema. + """ + if not isinstance(schema_id, SchemaClassId): + raise TypeError("class SchemaClassId expected") + params = {QmfData.KEY_SCHEMA_ID: schema_id} + return cls(_target=QmfQuery.TARGET_OBJECT_ID, + _target_params=params) + create_wildcard_object_id = classmethod(_create_wildcard_object_id) + + def _create_wildcard_object(cls, schema_id): + """ + Create a wildcard to match all objects for a given schema. + """ + if not isinstance(schema_id, SchemaClassId): + raise TypeError("class SchemaClassId expected") + params = {QmfData.KEY_SCHEMA_ID: schema_id} + return cls(_target=QmfQuery.TARGET_OBJECT, + _target_params=params) + create_wildcard_object = classmethod(_create_wildcard_object) + def _create_predicate(cls, target, predicate, _target_params=None): return cls(_target=target, _target_params=_target_params, _predicate=predicate) diff --git a/python/qmf2/console.py b/python/qmf2/console.py index df0f93f1da..441b770d53 100644 --- a/python/qmf2/console.py +++ b/python/qmf2/console.py @@ -44,146 +44,56 @@ _callback_thread=None ##============================================================================== -## Sequence Manager +## Console Transaction Management +## +## At any given time, a console application may have multiple outstanding +## message transactions with agents. The following objects allow the console +## to track these outstanding transactions. ##============================================================================== + class _Mailbox(object): """ - Virtual base class for all Mailbox-like objects + Virtual base class for all Mailbox-like objects. + """ + def deliver(self, data): + raise Exception("_Mailbox deliver() method must be provided") + + +class _WaitableMailbox(_Mailbox): + """ + A simple mailbox that allows a consumer to wait for delivery of data. """ def __init__(self): - self._msgs = [] + self._data = [] self._cv = Condition() self._waiting = False - def deliver(self, obj): + def deliver(self, data): + """ Drop data into the mailbox, waking any waiters if necessary. """ self._cv.acquire() try: - self._msgs.append(obj) + self._data.append(data) # if was empty, notify waiters - if len(self._msgs) == 1: + if len(self._data) == 1: self._cv.notify() finally: self._cv.release() def fetch(self, timeout=None): + """ Get one data item from a mailbox, with timeout. """ self._cv.acquire() try: - if len(self._msgs) == 0: + if len(self._data) == 0: self._cv.wait(timeout) - if len(self._msgs): - return self._msgs.pop() + if len(self._data): + return self._data.pop(0) return None finally: self._cv.release() -class SequencedWaiter(object): - """ - Manage sequence numbers for asynchronous method calls. - Allows the caller to associate a generic piece of data with a unique sequence - number.""" - - def __init__(self): - self.lock = Lock() - self.sequence = long(time.time()) # pseudo-randomize seq start - self.pending = {} - - - def allocate(self): - """ - Reserve a sequence number. - - @rtype: long - @return: a unique nonzero sequence number. - """ - self.lock.acquire() - try: - seq = self.sequence - self.sequence = self.sequence + 1 - self.pending[seq] = _Mailbox() - finally: - self.lock.release() - logging.debug( "sequence %d allocated" % seq) - return seq - - - def put_data(self, seq, new_data): - seq = long(seq) - logging.debug( "putting data [%r] to seq %d..." % (new_data, seq) ) - self.lock.acquire() - try: - if seq in self.pending: - # logging.error("Putting seq %d @ %s" % (seq,time.time())) - self.pending[seq].deliver(new_data) - else: - logging.error( "seq %d not found!" % seq ) - finally: - self.lock.release() - - - - def get_data(self, seq, timeout=None): - """ - Release a sequence number reserved using the reserve method. This must - be called when the sequence is no longer needed. - - @type seq: int - @param seq: a sequence previously allocated by calling reserve(). - @rtype: any - @return: the data originally associated with the reserved sequence number. - """ - seq = long(seq) - logging.debug( "getting data for seq=%d" % seq) - mbox = None - self.lock.acquire() - try: - if seq in self.pending: - mbox = self.pending[seq] - finally: - self.lock.release() - - # Note well: pending list is unlocked, so we can wait. - # we reference mbox locally, so it will not be released - # until we are done. - - if mbox: - d = mbox.fetch(timeout) - logging.debug( "seq %d fetched %r!" % (seq, d) ) - return d - - logging.debug( "seq %d not found!" % seq ) - return None - - - def release(self, seq): - """ - Release the sequence, and its mailbox - """ - seq = long(seq) - logging.debug( "releasing seq %d" % seq ) - self.lock.acquire() - try: - if seq in self.pending: - del self.pending[seq] - finally: - self.lock.release() - - - def is_valid(self, seq): - """ - True if seq is in use, else False (seq is unknown) - """ - seq = long(seq) - self.lock.acquire() - try: - return seq in self.pending - finally: - self.lock.release() - return False - - ##============================================================================== ## DATA MODEL ##============================================================================== @@ -275,9 +185,8 @@ class QmfConsoleData(QmfData): if _timeout is None: _timeout = self._agent._console._reply_timeout - handle = self._agent._console._req_correlation.allocate() - if handle == 0: - raise Exception("Can not allocate a correlation id!") + mbox = _WaitableMailbox() + cid = self._agent._console._add_mailbox(mbox) _map = {self.KEY_OBJECT_ID:str(oid), SchemaMethod.KEY_NAME:name} @@ -290,10 +199,10 @@ class QmfConsoleData(QmfData): logging.debug("Sending method req to Agent (%s)" % time.time()) try: - self._agent._send_method_req(_map, handle) + self._agent._send_method_req(_map, cid) except SendError, e: logging.error(str(e)) - self._agent._console._req_correlation.release(handle) + self._agent._console._remove_mailbox(cid) return None # @todo async method calls!!! @@ -301,8 +210,9 @@ class QmfConsoleData(QmfData): print("ASYNC TBD") logging.debug("Waiting for response to method req (%s)" % _timeout) - replyMsg = self._agent._console._req_correlation.get_data(handle, _timeout) - self._agent._console._req_correlation.release(handle) + replyMsg = mbox.fetch(_timeout) + self._agent._console._remove_mailbox(cid) + if not replyMsg: logging.debug("Agent method req wait timed-out.") return None @@ -376,10 +286,6 @@ class Agent(object): Low-level routine to asynchronously send a message to this agent. """ msg.reply_to = str(self._console._address) - # handle = self._console._req_correlation.allocate() - # if handle == 0: - # raise Exception("Can not allocate a correlation id!") - # msg.correlation_id = str(handle) if correlation_id: msg.correlation_id = str(correlation_id) # TRACE @@ -452,9 +358,8 @@ class Agent(object): if _in_args: _in_args = _in_args.copy() - handle = self._console._req_correlation.allocate() - if handle == 0: - raise Exception("Can not allocate a correlation id!") + mbox = _WaitableMailbox() + cid = self._console._add_mailbox(mbox) _map = {SchemaMethod.KEY_NAME:name} if _in_args: @@ -462,10 +367,10 @@ class Agent(object): logging.debug("Sending method req to Agent (%s)" % time.time()) try: - self._send_method_req(_map, handle) + self._send_method_req(_map, cid) except SendError, e: logging.error(str(e)) - self._console._req_correlation.release(handle) + self._console._remove_mailbox(cid) return None # @todo async method calls!!! @@ -473,8 +378,9 @@ class Agent(object): print("ASYNC TBD") logging.debug("Waiting for response to method req (%s)" % _timeout) - replyMsg = self._console._req_correlation.get_data(handle, _timeout) - self._console._req_correlation.release(handle) + replyMsg = mbox.fetch(_timeout) + self._console._remove_mailbox(cid) + if not replyMsg: logging.debug("Agent method req wait timed-out.") return None @@ -591,7 +497,6 @@ class Console(Thread): self._announce_recvr = None self._locate_sender = None self._schema_cache = {} - self._req_correlation = SequencedWaiter() self._agent_discovery_filter = None self._reply_timeout = reply_timeout self._agent_timeout = agent_timeout @@ -601,6 +506,10 @@ class Console(Thread): # for passing WorkItems to the application self._work_q = Queue.Queue() self._work_q_put = False + # Correlation ID and mailbox storage + self._correlation_id = long(time.time()) # pseudo-randomize + self._post_office = {} # indexed by cid + ## Old stuff below??? #self._broker_list = [] #self.impl = qmfengine.Console() @@ -763,9 +672,8 @@ class Console(Thread): # agent not present yet - ping it with an agent_locate - handle = self._req_correlation.allocate() - if handle == 0: - raise Exception("Can not allocate a correlation id!") + mbox = _WaitableMailbox() + cid = self._add_mailbox(mbox) query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) msg = Message(subject="console.ind.locate." + name, @@ -773,7 +681,7 @@ class Console(Thread): "qmf.subject":make_subject(OpCode.agent_locate)}, content={MsgKey.query: query.map_encode()}) msg.reply_to = str(self._address) - msg.correlation_id = str(handle) + msg.correlation_id = str(cid) logging.debug("Sending Agent Locate (%s)" % time.time()) # TRACE #logging.error("!!! Console %s sending agent locate (%s)" % @@ -782,7 +690,7 @@ class Console(Thread): self._topic_sender.send(msg) except SendError, e: logging.error(str(e)) - self._req_correlation.release(handle) + self._remove_mailbox(cid) return None if timeout is None: @@ -790,14 +698,15 @@ class Console(Thread): new_agent = None logging.debug("Waiting for response to Agent Locate (%s)" % timeout) - self._req_correlation.get_data( handle, timeout ) - self._req_correlation.release(handle) + mbox.fetch(timeout) + self._remove_mailbox(cid) logging.debug("Agent Locate wait ended (%s)" % time.time()) self._lock.acquire() try: new_agent = self._agent_map.get(name) finally: self._lock.release() + return new_agent @@ -828,82 +737,96 @@ class Console(Thread): def do_query(self, agent, query, timeout=None ): """ """ + query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info, + QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id, + QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id, + QmfQuery.TARGET_SCHEMA: MsgKey.schema, + QmfQuery.TARGET_OBJECT: MsgKey.data_obj, + QmfQuery.TARGET_AGENT: MsgKey.agent_info} target = query.get_target() - handle = self._req_correlation.allocate() - if handle == 0: - raise Exception("Can not allocate a correlation id!") + msgkey = query_keymap.get(target) + if not msgkey: + raise Exception("Invalid target for query: %s" % str(query)) + + mbox = _WaitableMailbox() + cid = self._add_mailbox(mbox) + try: logging.debug("Sending Query to Agent (%s)" % time.time()) - agent._send_query(query, handle) + agent._send_query(query, cid) except SendError, e: logging.error(str(e)) - self._req_correlation.release(handle) + self._remove_mailbox(cid) return None if not timeout: timeout = self._reply_timeout logging.debug("Waiting for response to Query (%s)" % timeout) - reply = self._req_correlation.get_data(handle, timeout) - self._req_correlation.release(handle) - if not reply: - logging.debug("Agent Query wait timed-out.") - return None + now = datetime.datetime.utcnow() + expire = now + datetime.timedelta(seconds=timeout) + + response = [] + while (expire > now): + timeout = timedelta_to_secs(expire - now) + reply = mbox.fetch(timeout) + if not reply: + logging.debug("Query wait timed-out.") + break + + objects = reply.content.get(msgkey) + if not objects: + # last response is empty + break + + # convert from map to native types if needed + if target == QmfQuery.TARGET_SCHEMA_ID: + for sid_map in objects: + response.append(SchemaClassId.from_map(sid_map)) + + elif target == QmfQuery.TARGET_SCHEMA: + for schema_map in objects: + # extract schema id, convert based on schema type + sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + if sid: + if sid.get_type() == SchemaClassId.TYPE_DATA: + schema = SchemaObjectClass.from_map(schema_map) + else: + schema = SchemaEventClass.from_map(schema_map) + self._add_schema(schema) # add to schema cache + response.append(schema) + + elif target == QmfQuery.TARGET_OBJECT: + for obj_map in objects: + obj = QmfConsoleData(map_=obj_map, agent=agent) + response.append(obj) + # @todo prefetch unknown schema + # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) + # if sid_map: + # sid = SchemaClassId.from_map(sid_map) + # # if the object references a schema, fetch it + # # schema = self._fetch_schema(sid, _agent=agent, + # # _timeout=timeout) + # # if not schema: + # # logging.warning("Unknown schema, id=%s" % sid) + # # continue + # obj = QmfConsoleData(map_=obj_map, agent=agent, + # _schema=schema) + # else: + # # no schema needed + else: + # no conversion needed. + response += objects + + now = datetime.datetime.utcnow() + + self._remove_mailbox(cid) + return response + - if target == QmfQuery.TARGET_PACKAGES: - # simply pass back the list of package names - logging.debug("Response to Packet Query received") - return reply.content.get(MsgKey.package_info) - elif target == QmfQuery.TARGET_OBJECT_ID: - # simply pass back the list of object_id's - logging.debug("Response to Object Id Query received") - return reply.content.get(MsgKey.object_id) - elif target == QmfQuery.TARGET_SCHEMA_ID: - logging.debug("Response to Schema Id Query received") - id_list = [] - for sid_map in reply.content.get(MsgKey.schema_id): - id_list.append(SchemaClassId.from_map(sid_map)) - return id_list - elif target == QmfQuery.TARGET_SCHEMA: - logging.debug("Response to Schema Query received") - schema_list = [] - for schema_map in reply.content.get(MsgKey.schema): - # extract schema id, convert based on schema type - sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) - if sid_map: - sid = SchemaClassId.from_map(sid_map) - if sid: - if sid.get_type() == SchemaClassId.TYPE_DATA: - schema = SchemaObjectClass.from_map(schema_map) - else: - schema = SchemaEventClass.from_map(schema_map) - schema_list.append(schema) - self._add_schema(schema) - return schema_list - elif target == QmfQuery.TARGET_OBJECT: - logging.debug("Response to Object Query received") - obj_list = [] - for obj_map in reply.content.get(MsgKey.data_obj): - obj = QmfConsoleData(map_=obj_map, agent=agent) - obj_list.append(obj) - # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) - # if sid_map: - # sid = SchemaClassId.from_map(sid_map) - # # if the object references a schema, fetch it - # # schema = self._fetch_schema(sid, _agent=agent, - # # _timeout=timeout) - # # if not schema: - # # logging.warning("Unknown schema, id=%s" % sid) - # # continue - # obj = QmfConsoleData(map_=obj_map, agent=agent, - # _schema=schema) - # else: - # # no schema needed - return obj_list - else: - logging.warning("Unexpected Target for a Query: '%s'" % target) - return None def run(self): global _callback_thread @@ -1115,7 +1038,8 @@ class Console(Thread): correlated = False if msg.correlation_id: - correlated = self._req_correlation.is_valid(msg.correlation_id) + mbox = self._get_mailbox(msg.correlation_id) + correlated = mbox is not None agent = None self._lock.acquire() @@ -1150,7 +1074,7 @@ class Console(Thread): if correlated: # wake up all waiters logging.debug("waking waiters for correlation id %s" % msg.correlation_id) - self._req_correlation.put_data(msg.correlation_id, msg) + mbox.deliver(msg) def _handle_data_ind_msg(self, msg, cmap, version, direct): """ @@ -1158,14 +1082,15 @@ class Console(Thread): """ logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time())) - if not self._req_correlation.is_valid(msg.correlation_id): + mbox = self._get_mailbox(msg.correlation_id) + if not mbox: logging.debug("Data indicate received with unknown correlation_id" " msg='%s'" % str(msg)) return # wake up all waiters logging.debug("waking waiters for correlation id %s" % msg.correlation_id) - self._req_correlation.put_data(msg.correlation_id, msg) + mbox.deliver(msg) def _handle_response_msg(self, msg, cmap, version, direct): @@ -1175,14 +1100,15 @@ class Console(Thread): # @todo code replication - clean me. logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time())) - if not self._req_correlation.is_valid(msg.correlation_id): + mbox = self._get_mailbox(msg.correlation_id) + if not mbox: logging.debug("Response msg received with unknown correlation_id" " msg='%s'" % str(msg)) return # wake up all waiters logging.debug("waking waiters for correlation id %s" % msg.correlation_id) - self._req_correlation.put_data(msg.correlation_id, msg) + mbox.deliver(msg) def _handle_event_ind_msg(self, msg, cmap, version, _direct): ei_map = cmap.get(MsgKey.event) @@ -1393,6 +1319,46 @@ class Console(Thread): else: return None + def _add_mailbox(self, mbox): + """ Add a mailbox to the post office, return a unique identifier """ + cid = 0 + self._lock.acquire() + try: + cid = self._correlation_id + self._correlation_id += 1 + self._post_office[cid] = mbox + finally: + self._lock.release() + return cid + + def _get_mailbox(self, mid): + try: + mid = long(mid) + except TypeError: + logging.error("Invalid mailbox id: %s" % str(mid)) + return None + + self._lock.acquire() + try: + return self._post_office.get(mid) + finally: + self._lock.release() + + + def _remove_mailbox(self, mid): + """ Remove a mailbox and its address from the post office """ + try: + mid = long(mid) + except TypeError: + logging.error("Invalid mailbox id: %s" % str(mid)) + return None + + self._lock.acquire() + try: + del self._post_office[mid] + finally: + self._lock.release() + def __repr__(self): return str(self._address) diff --git a/python/qmf2/tests/__init__.py b/python/qmf2/tests/__init__.py index e942c2fbc2..16acf50d79 100644 --- a/python/qmf2/tests/__init__.py +++ b/python/qmf2/tests/__init__.py @@ -24,3 +24,4 @@ import basic_query import basic_method import obj_gets import events +import multi_response diff --git a/python/qmf2/tests/multi_response.py b/python/qmf2/tests/multi_response.py new file mode 100644 index 0000000000..d3d00a70c5 --- /dev/null +++ b/python/qmf2/tests/multi_response.py @@ -0,0 +1,295 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +from threading import Thread, Event + +import qpid.messaging +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent) + +# note: objects, schema per agent must each be > max objs +_SCHEMAS_PER_AGENT=7 +_OBJS_PER_AGENT=19 +_MAX_OBJS_PER_MSG=3 + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, broker_url, heartbeat): + Thread.__init__(self) + self.schema_count = _SCHEMAS_PER_AGENT + self.obj_count = _OBJS_PER_AGENT + self.notifier = _testNotifier() + self.broker_url = broker_url + self.agent = Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat, + _max_msg_size=_MAX_OBJS_PER_MSG) + + # Dynamically construct a management database + for i in range(self.schema_count): + _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", + "MyClass-" + str(i)), + _desc="A test data schema", + _object_id_names=["index1", "index2"] ) + # add properties + _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # These two properties can be set via the method call + _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # add method + _meth = SchemaMethod( _desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + self.agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + for j in range(self.obj_count): + + self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, + _values={"index1":j, + "index2": "name-" + str(j), + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) + + self.running = False + self.ready = Event() + + def start_app(self): + self.running = True + self.start() + self.ready.wait(10) + if not self.ready.is_set(): + raise Exception("Agent failed to connect to broker.") + + def stop_app(self): + self.running = False + # wake main thread + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(self.broker_url.host, + self.broker_url.port, + self.broker_url.user, + self.broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + self.ready.set() + + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + if self.conn: + self.agent.remove_connection(10) + self.agent.destroy(10) + + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.agent_count = 2 + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent_heartbeat = 1 + self.agents = [] + for a in range(self.agent_count): + agent = _agentApp("agent-" + str(a), + self.broker, + self.agent_heartbeat) + agent.start_app() + self.agents.append(agent) + + def tearDown(self): + for agent in self.agents: + if agent is not None: + agent.stop_app() + + def test_all_schema_id(self): + # create console + # find agents + # synchronous query for all schemas_ids + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + for agent_app in self.agents: + agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3) + self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name()) + + # get a list of all schema_ids + query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) + sid_list = self.console.do_query(agent, query) + self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT) + for sid in sid_list: + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "MyPackage") + self.assertTrue(sid.get_class_name().split('-')[0] == "MyClass") + + self.console.destroy(10) + + + def test_all_schema(self): + # create console + # find agents + # synchronous query for all schemas + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + for agent_app in self.agents: + agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3) + self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name()) + + # get a list of all schema_ids + query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA) + schema_list = self.console.do_query(agent, query) + self.assertTrue(schema_list and + len(schema_list) == _SCHEMAS_PER_AGENT) + for schema in schema_list: + self.assertTrue(isinstance(schema, SchemaObjectClass)) + + self.console.destroy(10) + + + def test_all_object_id(self): + # create console + # find agents + # synchronous query for all object_ids by schema_id + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + for agent_app in self.agents: + agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3) + self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name()) + + # get a list of all schema_ids + query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) + sid_list = self.console.do_query(agent, query) + self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT) + for sid in sid_list: + query = QmfQuery.create_wildcard_object_id(sid) + oid_list = self.console.do_query(agent, query) + self.assertTrue(oid_list and + len(oid_list) == _OBJS_PER_AGENT) + for oid in oid_list: + self.assertTrue(isinstance(oid, basestring)) + + self.console.destroy(10) + + + def test_all_objects(self): + # create console + # find agents + # synchronous query for all objects by schema_id + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + for agent_app in self.agents: + agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3) + self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name()) + + # get a list of all schema_ids + query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) + sid_list = self.console.do_query(agent, query) + self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT) + for sid in sid_list: + query = QmfQuery.create_wildcard_object(sid) + obj_list = self.console.do_query(agent, query) + self.assertTrue(obj_list and + len(obj_list) == _OBJS_PER_AGENT) + for obj in obj_list: + self.assertTrue(isinstance(obj, + qmf2.console.QmfConsoleData)) + + self.console.destroy(10) |