diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-24 19:40:31 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-24 19:40:31 +0000 |
commit | dd248a04512bfcc14b4d9e3e1ee0e18cc66c568c (patch) | |
tree | 2336378209d61c90121d812076ba57e84488c72f | |
parent | 0e89a331de0d54b34de5bbe2531e3e48b272775f (diff) | |
download | qpid-python-dd248a04512bfcc14b4d9e3e1ee0e18cc66c568c.tar.gz |
QPID-2261: sync with msg formats defined on wiki, start subscription impl.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@915946 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/agent.py | 673 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/common.py | 83 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/console.py | 528 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/tests/__init__.py | 1 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py | 4 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/tests/async_method.py | 2 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/tests/async_query.py | 2 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/tests/basic_method.py | 2 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/tests/basic_query.py | 2 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/tests/events.py | 2 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/tests/multi_response.py | 4 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py | 2 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py | 446 |
13 files changed, 1410 insertions, 341 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2/agent.py b/qpid/extras/qmf/src/py/qmf2/agent.py index a6b3c39ad1..069cf184ce 100644 --- a/qpid/extras/qmf/src/py/qmf2/agent.py +++ b/qpid/extras/qmf/src/py/qmf2/agent.py @@ -21,18 +21,19 @@ import logging import datetime import time import Queue -from threading import Thread, Lock, currentThread, Event +from threading import Thread, RLock, currentThread, Event from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 -from common import (make_subject, parse_subject, OpCode, QmfQuery, - SchemaObjectClass, MsgKey, QmfData, QmfAddress, - SchemaClass, SchemaClassId, WorkItem, SchemaMethod, - timedelta_to_secs) +from common import (OpCode, QmfQuery, ContentType, SchemaObjectClass, + QmfData, QmfAddress, SchemaClass, SchemaClassId, WorkItem, + SchemaMethod, timedelta_to_secs, QMF_APP_ID) # global flag that indicates which thread (if any) is # running the agent notifier callback _callback_thread=None + + ##============================================================================== ## METHOD CALL ##============================================================================== @@ -78,14 +79,73 @@ class MethodCallParams(object): return self._user_id + ##============================================================================== + ## SUBSCRIPTIONS + ##============================================================================== + +class _ConsoleHandle(object): + """ + """ + def __init__(self, handle, reply_to): + self.console_handle = handle + self.reply_to = reply_to + +class SubscriptionParams(object): + """ + """ + def __init__(self, console_handle, query, interval, duration, user_id): + self._console_handle = console_handle + self._query = query + self._interval = interval + self._duration = duration + self._user_id = user_id + + def get_console_handle(self): + return self._console_handle + + def get_query(self): + return self._query + + def get_interval(self): + return self._interval + + def get_duration(self): + return self._duration + + def get_user_id(self): + return self._user_id + +class _SubscriptionState(object): + """ + An internally-managed subscription. + """ + def __init__(self, reply_to, cid, query, interval, duration): + self.reply_to = reply_to + self.correlation_id = cid + self.query = query + self.interval = interval + self.duration = duration + now = datetime.datetime.utcnow() + self.next_update = now # do an immediate update + self.expiration = now + datetime.timedelta(seconds=duration) + self.id = 0 + + def resubscribe(self, now, _duration=None): + if _duration is not None: + self.duration = _duration + self.expiration = now + datetime.timedelta(seconds=self.duration) + + def reset_interval(self, now): + self.next_update = now + datetime.timedelta(seconds=self.interval) + + ##============================================================================== ## AGENT ##============================================================================== class Agent(Thread): - def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30, - _max_msg_size=0, _capacity=10): + def __init__(self, name, _domain=None, _notifier=None, **options): Thread.__init__(self) self._running = False self._ready = Event() @@ -94,11 +154,20 @@ class Agent(Thread): self._domain = _domain self._address = QmfAddress.direct(self.name, self._domain) self._notifier = _notifier - self._heartbeat_interval = _heartbeat_interval + + # configurable parameters + # + self._heartbeat_interval = options.get("heartbeat_interval", 30) + self._capacity = options.get("capacity", 10) + self._default_duration = options.get("default_duration", 300) + self._max_duration = options.get("max_duration", 3600) + self._min_duration = options.get("min_duration", 10) + self._default_interval = options.get("default_interval", 30) + self._min_interval = options.get("min_interval", 5) + # @todo: currently, max # of objects in a single reply message, would # be better if it were max bytesize of per-msg content... - self._max_msg_size = _max_msg_size - self._capacity = _capacity + self._max_msg_size = options.get("max_msg_size", 0) self._conn = None self._session = None @@ -107,7 +176,7 @@ class Agent(Thread): self._direct_sender = None self._topic_sender = None - self._lock = Lock() + self._lock = RLock() self._packages = {} self._schema_timestamp = long(0) self._schema = {} @@ -119,6 +188,10 @@ class Agent(Thread): self._undescribed_data = {} self._work_q = Queue.Queue() self._work_q_put = False + # subscriptions + self._subscription_id = long(time.time()) + self._subscriptions = {} + self._next_subscribe_event = None def destroy(self, timeout=None): @@ -192,10 +265,11 @@ class Agent(Thread): if self.isAlive(): # kick my thread to wake it up try: - msg = Message(properties={"method":"request", - "qmf.subject":make_subject(OpCode.noop)}, + msg = Message(id=QMF_APP_ID, subject=self.name, - content={"noop":"noop"}) + properties={ "method":"request", + "qmf.opcode":OpCode.noop}, + content={}) # TRACE #logging.error("!!! sending wakeup to myself: %s" % msg) @@ -258,13 +332,14 @@ class Agent(Thread): raise Exception("No connection available") # @todo: should we validate against the schema? - _map = {"_name": self.get_name(), - "_event": qmfEvent.map_encode()} - msg = Message(subject=QmfAddress.SUBJECT_AGENT_EVENT + "." + + msg = Message(id=QMF_APP_ID, + subject=QmfAddress.SUBJECT_AGENT_EVENT + "." + qmfEvent.get_severity() + "." + self.name, - properties={"method":"response", - "qmf.subject":make_subject(OpCode.event_ind)}, - content={MsgKey.event:_map}) + properties={"method":"indication", + "qmf.opcode":OpCode.data_ind, + "qmf.content": ContentType.event, + "qmf.agent":self.name}, + content=[qmfEvent.map_encode()]) # TRACE # logging.error("!!! Agent %s sending Event (%s)" % # (self.name, str(msg))) @@ -330,9 +405,10 @@ class Agent(Thread): raise TypeError("Invalid type for error - must be QmfData") _map[SchemaMethod.KEY_ERROR] = _error.map_encode() - msg = Message( properties={"method":"response", - "qmf.subject":make_subject(OpCode.response)}, - content={MsgKey.method:_map}) + msg = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.method_rsp}, + content=_map) msg.correlation_id = handle.correlation_id self._send_reply(msg, handle.reply_to) @@ -370,25 +446,10 @@ class Agent(Thread): while self._running: - now = datetime.datetime.utcnow() - # print("now=%s next_heartbeat=%s" % (now, next_heartbeat)) - if now >= next_heartbeat: - ind = self._makeAgentIndMsg() - ind.subject = QmfAddress.SUBJECT_AGENT_HEARTBEAT - # TRACE - #logging.error("!!! Agent %s sending Heartbeat (%s)" % - # (self.name, str(ind))) - self._topic_sender.send(ind) - logging.debug("Agent Indication Sent") - next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) - - timeout = timedelta_to_secs(next_heartbeat - now) - # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout)) - try: - self._session.next_receiver(timeout=timeout) - except Empty: - continue - + # + # Process inbound messages + # + logging.debug("%s processing inbound messages..." % self.name) for i in range(batch_limit): try: msg = self._topic_receiver.fetch(timeout=0) @@ -409,7 +470,71 @@ class Agent(Thread): # (self.name, self._direct_receiver.source, msg)) self._dispatch(msg, _direct=True) + # + # Send Heartbeat Notification + # + now = datetime.datetime.utcnow() + if now >= next_heartbeat: + logging.debug("%s sending heartbeat..." % self.name) + ind = Message(id=QMF_APP_ID, + subject=QmfAddress.SUBJECT_AGENT_HEARTBEAT, + properties={"method":"indication", + "qmf.opcode":OpCode.agent_heartbeat_ind, + "qmf.agent":self.name}, + content=self._makeAgentInfoBody()) + # TRACE + #logging.error("!!! Agent %s sending Heartbeat (%s)" % + # (self.name, str(ind))) + self._topic_sender.send(ind) + logging.debug("Agent Indication Sent") + next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) + + + + # + # Monitor Subscriptions + # + if (self._next_subscribe_event is None or + now >= self._next_subscribe_event): + + logging.debug("%s polling subscriptions..." % self.name) + self._next_subscribe_event = now + datetime.timedelta(seconds= + self._max_duration) + self._lock.acquire() + try: + dead_ss = [] + for sid,ss in self._subscriptions.iteritems(): + if now >= ss.expiration: + dead_ss.append(sid) + continue + if now >= ss.next_update: + response = [] + objs = self._queryData(ss.query) + if objs: + for obj in objs: + response.append(obj.map_encode()) + logging.debug("!!! %s publishing %s!!!" % (self.name, ss.correlation_id)) + self._send_query_response( ContentType.data, + ss.correlation_id, + ss.reply_to, + response) + ss.reset_interval(now) + + next_timeout = min(ss.expiration, ss.next_update) + if next_timeout < self._next_subscribe_event: + self._next_subscribe_event = next_timeout + + for sid in dead_ss: + del self._subscriptions[sid] + finally: + self._lock.release() + + # + # notify application of pending WorkItems + # + if self._work_q_put and self._notifier: + logging.debug("%s notifying application..." % self.name) # new stuff on work queue, kick the the application... self._work_q_put = False _callback_thread = currentThread() @@ -417,19 +542,33 @@ class Agent(Thread): self._notifier.indication() _callback_thread = None + # + # Sleep until messages arrive or something times out + # + next_timeout = min(next_heartbeat, self._next_subscribe_event) + timeout = timedelta_to_secs(next_timeout - + datetime.datetime.utcnow()) + if timeout > 0.0: + logging.debug("%s sleeping %s seconds..." % (self.name, + timeout)) + try: + self._session.next_receiver(timeout=timeout) + except Empty: + pass + + + + # # Private: # - def _makeAgentIndMsg(self): + def _makeAgentInfoBody(self): """ - Create an agent indication message identifying this agent + Create an agent indication message body identifying this agent """ - _map = {"_name": self.get_name(), + return {"_name": self.get_name(), "_schema_timestamp": self._schema_timestamp} - return Message(properties={"method":"response", - "qmf.subject":make_subject(OpCode.agent_ind)}, - content={MsgKey.agent_info: _map}) def _send_reply(self, msg, reply_to): """ @@ -458,7 +597,7 @@ class Agent(Thread): except SendError, e: logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e))) - def _send_query_response(self, subject, msgkey, cid, reply_to, objects): + def _send_query_response(self, content_type, cid, reply_to, objects): """ Send a response to a query, breaking the result into multiple messages based on the agent's _max_msg_size config parameter @@ -472,24 +611,28 @@ class Agent(Thread): start = 0 end = min(total, max_count) - while end <= total: - m = Message(properties={"qmf.subject":subject, - "method":"response"}, + # send partial response if too many objects present + while end < total: + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "partial":None, + "qmf.opcode":OpCode.data_ind, + "qmf.content":content_type, + "qmf.agent":self.name}, correlation_id = cid, - content={msgkey:objects[start:end]}) + content=objects[start:end]) self._send_reply(m, reply_to) - if end == total: - break; start = end end = min(total, end + max_count) - # response terminator - last message has empty object array - if total: - m = Message(properties={"qmf.subject":subject, - "method":"response"}, - correlation_id = cid, - content={msgkey: []} ) - self._send_reply(m, reply_to) + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.data_ind, + "qmf.content":content_type, + "qmf.agent":self.name}, + correlation_id = cid, + content=objects[start:end]) + self._send_reply(m, reply_to) def _dispatch(self, msg, _direct=False): """ @@ -497,33 +640,32 @@ class Agent(Thread): @param _direct: True if msg directly addressed to this agent. """ - logging.debug( "Message received from Console! [%s]" % msg ) - try: - version,opcode = parse_subject(msg.properties.get("qmf.subject")) - except: - logging.warning("Ignoring unrecognized message '%s'" % msg.subject) - return + # logging.debug( "Message received from Console! [%s]" % msg ) + # logging.error( "%s Message received from Console! [%s]" % (self.name, msg) ) + opcode = msg.properties.get("qmf.opcode") + if not opcode: + logging.warning("Ignoring unrecognized message '%s'" % msg) + return + version = 2 # @todo: fix me cmap = {}; props={} if msg.content_type == "amqp/map": cmap = msg.content if msg.properties: props = msg.properties - if opcode == OpCode.agent_locate: + if opcode == OpCode.agent_locate_req: self._handleAgentLocateMsg( msg, cmap, props, version, _direct ) - elif opcode == OpCode.get_query: + elif opcode == OpCode.query_req: self._handleQueryMsg( msg, cmap, props, version, _direct ) elif opcode == OpCode.method_req: self._handleMethodReqMsg(msg, cmap, props, version, _direct) - elif opcode == OpCode.cancel_subscription: - logging.warning("!!! CANCEL_SUB TBD !!!") - elif opcode == OpCode.create_subscription: - logging.warning("!!! CREATE_SUB TBD !!!") - elif opcode == OpCode.renew_subscription: - logging.warning("!!! RENEW_SUB TBD !!!") - elif opcode == OpCode.schema_query: - logging.warning("!!! SCHEMA_QUERY TBD !!!") + elif opcode == OpCode.subscribe_req: + self._handleSubscribeReqMsg(msg, cmap, props, version, _direct) + elif opcode == OpCode.subscribe_refresh_req: + self._handleResubscribeReqMsg(msg, cmap, props, version, _direct) + elif opcode == OpCode.subscribe_cancel_ind: + self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct) elif opcode == OpCode.noop: logging.debug("No-op msg received.") else: @@ -536,18 +678,28 @@ class Agent(Thread): """ logging.debug("_handleAgentLocateMsg") - reply = True - if "method" in props and props["method"] == "request": - query = cmap.get(MsgKey.query) - if query is not None: - # fake a QmfData containing my identifier for the query compare - tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME: - self.get_name()}, - _object_id="my-name") - reply = QmfQuery.from_map(query).evaluate(tmpData) + reply = False + if props.get("method") == "request": + # if the message is addressed to me or wildcard, process it + if (msg.subject == "console.ind" or + msg.subject == "console.ind.locate" or + msg.subject == "console.ind.locate." + self.name): + pred = msg.content + if not pred: + reply = True + elif isinstance(pred, type([])): + # fake a QmfData containing my identifier for the query compare + query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, pred) + tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME: + self.get_name()}, + _object_id="my-name") + reply = query.evaluate(tmpData) if reply: - m = self._makeAgentIndMsg() + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.agent_locate_rsp}, + content=self._makeAgentInfoBody()) m.correlation_id = msg.correlation_id self._send_reply(m, msg.reply_to) else: @@ -561,22 +713,25 @@ class Agent(Thread): logging.debug("_handleQueryMsg") if "method" in props and props["method"] == "request": - qmap = cmap.get(MsgKey.query) - if qmap: - query = QmfQuery.from_map(qmap) + if cmap: + try: + query = QmfQuery.from_map(cmap) + except TypeError: + logging.error("Invalid Query format: '%s'" % str(cmap)) + return target = query.get_target() if target == QmfQuery.TARGET_PACKAGES: - self._queryPackages( msg, query ) + self._queryPackagesReply( msg, query ) elif target == QmfQuery.TARGET_SCHEMA_ID: - self._querySchema( msg, query, _idOnly=True ) + self._querySchemaReply( msg, query, _idOnly=True ) elif target == QmfQuery.TARGET_SCHEMA: - self._querySchema( msg, query) + self._querySchemaReply( msg, query) elif target == QmfQuery.TARGET_AGENT: logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!") elif target == QmfQuery.TARGET_OBJECT_ID: - self._queryData(msg, query, _idOnly=True) + self._queryDataReply(msg, query, _idOnly=True) elif target == QmfQuery.TARGET_OBJECT: - self._queryData(msg, query) + self._queryDataReply(msg, query) else: logging.warning("Unrecognized query target: '%s'" % str(target)) @@ -634,7 +789,159 @@ class Agent(Thread): self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param)) self._work_q_put = True - def _queryPackages(self, msg, query): + def _handleSubscribeReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Subscription Request + """ + if "method" in props and props["method"] == "request": + query_map = cmap.get("_query") + interval = cmap.get("_interval") + duration = cmap.get("_duration") + + try: + query = QmfQuery.from_map(query_map) + except TypeError: + logging.warning("Invalid query for subscription: %s" % + str(query_map)) + return + + if isinstance(self, AgentExternal): + # param = SubscriptionParams(_ConsoleHandle(console_handle, + # msg.reply_to), + # query, + # interval, + # duration, + # msg.user_id) + # self._work_q.put(WorkItem(WorkItem.SUBSCRIBE_REQUEST, + # msg.correlation_id, param)) + # self._work_q_put = True + logging.error("External Subscription TBD") + return + + # validate the query - only specific objects, or + # objects wildcard, are currently supported. + if (query.get_target() != QmfQuery.TARGET_OBJECT or + (query.get_selector() == QmfQuery.PREDICATE and + query.get_predicate())): + logging.error("Subscriptions only support (wildcard) Object" + " Queries.") + err = QmfData.create( + {"reason": "Unsupported Query type for subscription.", + "query": str(query.map_encode())}) + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.subscribe_rsp}, + correlation_id = msg.correlation_id, + content={"_error": err.map_encode()}) + self._send_reply(m, msg.reply_to) + return + + if duration is None: + duration = self._default_duration + else: + try: + duration = float(duration) + if duration > self._max_duration: + duration = self._max_duration + elif duration < self._min_duration: + duration = self._min_duration + except: + logging.warning("Bad duration value: %s" % str(msg)) + duration = self._default_duration + + if interval is None: + interval = self._default_interval + else: + try: + interval = float(interval) + if interval < self._min_interval: + interval = self._min_interval + except: + logging.warning("Bad interval value: %s" % str(msg)) + interval = self._default_interval + + ss = _SubscriptionState(msg.reply_to, + msg.correlation_id, + query, + interval, + duration) + self._lock.acquire() + try: + sid = self._subscription_id + self._subscription_id += 1 + ss.id = sid + self._subscriptions[sid] = ss + self._next_subscribe_event = None + finally: + self._lock.release() + + sr_map = {"_subscription_id": sid, + "_interval": interval, + "_duration": duration} + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.subscribe_rsp}, + correlation_id = msg.correlation_id, + content=sr_map) + self._send_reply(m, msg.reply_to) + + + + def _handleResubscribeReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Renew Subscription Request + """ + if props.get("method") == "request": + sid = cmap.get("_subscription_id") + if not sid: + logging.debug("Invalid subscription refresh msg: %s" % + str(msg)) + return + + self._lock.acquire() + try: + ss = self._subscriptions.get(sid) + if not ss: + logging.debug("Ignoring unknown subscription: %s" % + str(sid)) + return + duration = cmap.get("_duration") + if duration is not None: + try: + duration = float(duration) + if duration > self._max_duration: + duration = self._max_duration + elif duration < self._min_duration: + duration = self._min_duration + except: + logging.debug("Bad duration value: %s" % str(msg)) + duration = None # use existing duration + + ss.resubscribe(datetime.datetime.utcnow(), duration) + + finally: + self._lock.release() + + + def _handleUnsubscribeReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Cancel Subscription Request + """ + if props.get("method") == "request": + sid = cmap.get("_subscription_id") + if not sid: + logging.warning("No subscription id supplied: %s" % msg) + return + + self._lock.acquire() + try: + if sid in self._subscriptions: + del self._subscriptions[sid] + finally: + self._lock.release() + + + def _queryPackagesReply(self, msg, query): """ Run a query against the list of known packages """ @@ -646,58 +953,83 @@ class Agent(Thread): _object_id="_package") if query.evaluate(qmfData): pnames.append(name) + + self._send_query_response(ContentType.schema_package, + msg.correlation_id, + msg.reply_to, + pnames) finally: self._lock.release() - self._send_query_response(make_subject(OpCode.data_ind), - MsgKey.package_info, - msg.correlation_id, - msg.reply_to, - pnames) - def _querySchema( self, msg, query, _idOnly=False ): + def _querySchemaReply( self, msg, query, _idOnly=False ): """ """ schemas = [] - # if querying for a specific schema, do a direct lookup - if query.get_selector() == QmfQuery.ID: - found = None - self._lock.acquire() - try: + + self._lock.acquire() + try: + # if querying for a specific schema, do a direct lookup + if query.get_selector() == QmfQuery.ID: found = self._schema.get(query.get_id()) - finally: - self._lock.release() - if found: - if _idOnly: - schemas.append(query.get_id().map_encode()) - else: - schemas.append(found.map_encode()) - else: # otherwise, evaluate all schema - self._lock.acquire() - try: + if found: + if _idOnly: + schemas.append(query.get_id().map_encode()) + else: + schemas.append(found.map_encode()) + else: # otherwise, evaluate all schema for sid,val in self._schema.iteritems(): if query.evaluate(val): if _idOnly: schemas.append(sid.map_encode()) else: schemas.append(val.map_encode()) - finally: - self._lock.release() + if _idOnly: + msgkey = ContentType.schema_id + else: + msgkey = ContentType.schema_class - if _idOnly: - msgkey = MsgKey.schema_id - else: - msgkey = MsgKey.schema + self._send_query_response(msgkey, + msg.correlation_id, + msg.reply_to, + schemas) + finally: + self._lock.release() - self._send_query_response(make_subject(OpCode.data_ind), - msgkey, - msg.correlation_id, - msg.reply_to, - schemas) + def _queryDataReply( self, msg, query, _idOnly=False ): + """ + """ + # hold the (recursive) lock for the duration so the Agent + # won't send data that is currently being modified by the + # app. + self._lock.acquire() + try: + response = [] + data_objs = self._queryData(query) + if _idOnly: + for obj in data_objs: + response.append(obj.get_object_id()) + else: + for obj in data_objs: + response.append(obj.map_encode()) + + if _idOnly: + msgkey = ContentType.object_id + else: + msgkey = ContentType.data - def _queryData( self, msg, query, _idOnly=False ): + self._send_query_response(msgkey, + msg.correlation_id, + msg.reply_to, + response) + finally: + self._lock.release() + + + def _queryData(self, query): """ + Return a list of QmfData objects that match a given query """ data_objs = [] # extract optional schema_id from target params @@ -705,12 +1037,12 @@ class Agent(Thread): t_params = query.get_target_param() if t_params: sid = t_params.get(QmfData.KEY_SCHEMA_ID) - # if querying for a specific object, do a direct lookup - if query.get_selector() == QmfQuery.ID: - oid = query.get_id() - found = None - self._lock.acquire() - try: + + self._lock.acquire() + try: + # if querying for a specific object, do a direct lookup + if query.get_selector() == QmfQuery.ID: + oid = query.get_id() if sid and not sid.get_hash_string(): # wildcard schema_id match, check each schema for name,db in self._described_data.iteritems(): @@ -718,11 +1050,9 @@ class Agent(Thread): and name.get_package_name() == sid.get_package_name()): found = db.get(oid) if found: - if _idOnly: - data_objs.append(oid) - else: - data_objs.append(found.map_encode()) + data_objs.append(found) else: + found = None if sid: db = self._described_data.get(sid) if db: @@ -730,15 +1060,9 @@ class Agent(Thread): else: found = self._undescribed_data.get(oid) if found: - if _idOnly: - data_objs.append(oid) - else: - data_objs.append(found.map_encode()) - finally: - self._lock.release() - else: # otherwise, evaluate all data - self._lock.acquire() - try: + data_objs.append(found) + + else: # otherwise, evaluate all data if sid and not sid.get_hash_string(): # wildcard schema_id match, check each schema for name,db in self._described_data.iteritems(): @@ -746,10 +1070,7 @@ class Agent(Thread): and name.get_package_name() == sid.get_package_name()): for oid,data in db.iteritems(): if query.evaluate(data): - if _idOnly: - data_objs.append(oid) - else: - data_objs.append(data.map_encode()) + data_objs.append(data) else: if sid: db = self._described_data.get(sid) @@ -759,23 +1080,28 @@ class Agent(Thread): if db: for oid,data in db.iteritems(): if query.evaluate(data): - if _idOnly: - data_objs.append(oid) - else: - data_objs.append(data.map_encode()) - finally: - self._lock.release() + data_objs.append(data) + finally: + self._lock.release() - if _idOnly: - msgkey = MsgKey.object_id - else: - msgkey = MsgKey.data_obj + return data_objs - self._send_query_response(make_subject(OpCode.data_ind), - msgkey, - msg.correlation_id, - msg.reply_to, - data_objs) + + + ##============================================================================== + ## EXTERNAL DATABASE AGENT + ##============================================================================== + +class AgentExternal(Agent): + """ + An Agent which uses an external management database. + """ + def __init__(self, name, _domain=None, _notifier=None, + _heartbeat_interval=30, _max_msg_size=0, _capacity=10): + super(AgentExternal, self).__init__(name, _domain, _notifier, + _heartbeat_interval, + _max_msg_size, _capacity) + logging.error("AgentExternal TBD") @@ -823,9 +1149,11 @@ class QmfAgentData(QmfData): _schema_id=schema_id, _const=False) self._agent = agent self._validated = False + self._modified = True def destroy(self): self._dtime = long(time.time() * 1000) + self._touch() # @todo: publish change def is_deleted(self): @@ -833,6 +1161,7 @@ class QmfAgentData(QmfData): def set_value(self, _name, _value, _subType=None): super(QmfAgentData, self).set_value(_name, _value, _subType) + self._touch() # @todo: publish change def inc_value(self, name, delta=1): @@ -849,6 +1178,7 @@ class QmfAgentData(QmfData): """ subtract the delta from the property """ # @todo: need to take write-lock logging.error(" TBD!!!") + self._touch() def validate(self): """ @@ -868,6 +1198,13 @@ class QmfAgentData(QmfData): raise Exception("Required property '%s' not present." % name) self._validated = True + def _touch(self): + """ + Mark this object as modified. Used to force a publish of this object + if on subscription. + """ + self._modified = True + ################################################################################ diff --git a/qpid/extras/qmf/src/py/qmf2/common.py b/qpid/extras/qmf/src/py/qmf2/common.py index 8107b86666..8070add806 100644 --- a/qpid/extras/qmf/src/py/qmf2/common.py +++ b/qpid/extras/qmf/src/py/qmf2/common.py @@ -34,61 +34,44 @@ log_query = getLogger("qmf.query") ## Constants ## -AMQP_QMF_SUBJECT = "qmf" -AMQP_QMF_VERSION = 4 -AMQP_QMF_SUBJECT_FMT = "%s%d.%s" - -class MsgKey(object): - agent_info = "agent_info" - query = "query" - package_info = "package_info" - schema_id = "schema_id" - schema = "schema" - object_id="object_id" - data_obj="object" - method="method" - event="event" +QMF_APP_ID="qmf2" -class OpCode(object): - noop = "noop" - - # codes sent by a console and processed by the agent - agent_locate = "agent-locate" - cancel_subscription = "cancel-subscription" - create_subscription = "create-subscription" - get_query = "get-query" - method_req = "method" - renew_subscription = "renew-subscription" - schema_query = "schema-query" # @todo: deprecate - - # codes sent by the agent to a console - agent_ind = "agent" - data_ind = "data" - event_ind = "event" - managed_object = "managed-object" - object_ind = "object" - response = "response" - schema_ind="schema" # @todo: deprecate +class ContentType(object): + """ Values for the 'qmf.content' message header + """ + schema_package = "_schema_package" + schema_id = "_schema_id" + schema_class = "_schema_class" + object_id = "_object_id" + data = "_data" + event = "_event" +class OpCode(object): + """ Values for the 'qmf.opcode' message header. + """ + noop = "_noop" + # codes sent by a console and processed by the agent + agent_locate_req = "_agent_locate_request" + subscribe_req = "_subscribe_request" + subscribe_cancel_ind = "_subscribe_cancel_indication" + subscribe_refresh_req = "_subscribe_refresh_indication" + query_req = "_query_request" + method_req = "_method_request" -def make_subject(_code): - """ - Create a message subject field value. - """ - return AMQP_QMF_SUBJECT_FMT % (AMQP_QMF_SUBJECT, AMQP_QMF_VERSION, _code) + # codes sent by the agent to a console + agent_locate_rsp = "_agent_locate_response" + agent_heartbeat_ind = "_agent_heartbeat_indication" + query_rsp = "_query_response" + subscribe_rsp = "_subscribe_response" + subscribe_refresh_rsp = "_subscribe_refresh_response" + data_ind = "_data_indication" + method_rsp = "_method_response" -def parse_subject(_sub): - """ - Deconstruct a subject field, return version,opcode values - """ - if _sub[:3] != "qmf": - raise Exception("Non-QMF message received") - return _sub[3:].split('.', 1) def timedelta_to_secs(td): """ @@ -133,11 +116,15 @@ class WorkItem(object): AGENT_HEARTBEAT=8 QUERY_COMPLETE=9 METHOD_RESPONSE=10 + SUBSCRIBE_RESPONSE=11 + SUBSCRIBE_INDICATION=12 + RESUBSCRIBE_RESPONSE=13 # Enumeration of the types of WorkItems produced on the Agent METHOD_CALL=1000 QUERY=1001 - SUBSCRIBE=1002 - UNSUBSCRIBE=1003 + SUBSCRIBE_REQUEST=1002 + RESUBSCRIBE_REQUEST=1003 + UNSUBSCRIBE_REQUEST=1004 def __init__(self, kind, handle, _params=None): """ diff --git a/qpid/extras/qmf/src/py/qmf2/console.py b/qpid/extras/qmf/src/py/qmf2/console.py index c13cf70755..d8b625068e 100644 --- a/qpid/extras/qmf/src/py/qmf2/console.py +++ b/qpid/extras/qmf/src/py/qmf2/console.py @@ -24,14 +24,14 @@ import time import datetime import Queue from threading import Thread, Event -from threading import Lock +from threading import RLock from threading import currentThread from threading import Condition from qpid.messaging import Connection, Message, Empty, SendError -from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier, - MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId, +from common import (QMF_APP_ID, OpCode, QmfQuery, Notifier, ContentType, + QmfData, QmfAddress, SchemaClass, SchemaClassId, SchemaEventClass, SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent, timedelta_to_secs) @@ -141,6 +141,7 @@ class _AsyncMailbox(_Mailbox): console._lock.acquire() try: console._async_mboxes[self.cid] = self + console._next_mbox_expire = None finally: console._lock.release() @@ -177,7 +178,7 @@ class _QueryMailbox(_AsyncMailbox): def __init__(self, console, agent_name, context, - target, msgkey, + target, _timeout=None): """ Invoked by application thread. @@ -186,7 +187,6 @@ class _QueryMailbox(_AsyncMailbox): _timeout) self.agent_name = agent_name self.target = target - self.msgkey = msgkey self.context = context self.result = [] @@ -195,11 +195,8 @@ class _QueryMailbox(_AsyncMailbox): Process query response messages delivered to this mailbox. Invoked by Console Management thread only. """ - done = False - objects = reply.content.get(self.msgkey) - if not objects: - done = True - else: + objects = reply.content + if isinstance(objects, type([])): # convert from map to native types if needed if self.target == QmfQuery.TARGET_SCHEMA_ID: for sid_map in objects: @@ -237,8 +234,7 @@ class _QueryMailbox(_AsyncMailbox): # no conversion needed. self.result += objects - if done: - # create workitem + if not "partial" in reply.properties: # logging.error("QUERY COMPLETE for %s" % str(self.context)) wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) self.console._work_q.put(wi) @@ -278,8 +274,8 @@ class _SchemaPrefetchMailbox(_AsyncMailbox): Process schema response messages. """ done = False - schemas = reply.content.get(MsgKey.schema) - if schemas: + schemas = reply.content + if schemas and isinstance(schemas, type([])): for schema_map in schemas: # extract schema id, convert based on schema type sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) @@ -319,8 +315,8 @@ class _MethodMailbox(_AsyncMailbox): Invoked by Console Management thread only. """ - _map = reply.content.get(MsgKey.method) - if not _map: + _map = reply.content + if not _map or not isinstance(_map, type({})): logging.error("Invalid method call reply message") result = None else: @@ -355,6 +351,128 @@ class _MethodMailbox(_AsyncMailbox): +class _SubscriptionMailbox(_AsyncMailbox): + """ + A Mailbox for a single subscription. + """ + def __init__(self, console, lifetime, context, agent): + """ + Invoked by application thread. + """ + super(_SubscriptionMailbox, self).__init__(console, lifetime) + self.cv = Condition() + self.data = [] + self.result = [] + self.context = context + self.agent_name = agent.get_name() + self.agent_subscription_id = None # from agent + + def deliver(self, msg): + """ + """ + opcode = msg.properties.get("qmf.opcode") + if (opcode == OpCode.subscribe_rsp or + opcode == OpCode.subscribe_refresh_rsp): + # + # sync only - just deliver the msg + # + self.cv.acquire() + try: + self.data.append(msg) + # if was empty, notify waiters + if len(self.data) == 1: + self.cv.notify() + finally: + self.cv.release() + return + + # sid = msg.content.get("_subscription_id") + # lifetime = msg.content.get("_duration") + # error = msg.content.get("_error") + # sp = SubscribeParams(sid, + # msg.content.get("_interval"), + # lifetime, error) + # if sid and self.subscription_id is None: + # self.subscription_id = sid + # if lifetime: + # self.console._lock.acquire() + # try: + # self.expiration_date = (datetime.datetime.utcnow() + + # datetime.timedelta(seconds=lifetime)) + # finally: + # self.console._lock.release() + + # if self.waiting: + # self.cv.acquire() + # try: + # self.data.append(sp) + # # if was empty, notify waiters + # if len(self._data) == 1: + # self._cv.notify() + # finally: + # self._cv.release() + # else: + # if opcode == OpCode.subscribe_rsp: + # wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, + # self.context, sp) + # else: + # wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE, + # self.context, sp) + # self.console._work_q.put(wi) + # self.console._work_q_put = True + # if error: + # self.destroy() + + agent_name = msg.properties.get("qmf.agent") + if not agent_name: + logging.warning("Ignoring data_ind - no agent name given: %s" % + msg) + return + agent = self.console.get_agent(agent_name) + if not agent: + logging.warning("Ignoring data_ind - unknown agent '%s'" % + agent_name) + return + + objects = msg.content + for obj_map in objects: + obj = QmfConsoleData(map_=obj_map, agent=agent) + # start fetch of schema if not known + sid = obj.get_schema_class_id() + if sid: + self.console._prefetch_schema(sid, agent) + self.result.append(obj) + + if not "partial" in msg.properties: + wi = WorkItem(WorkItem.SUBSCRIBE_INDICATION, self.context, self.result) + self.result = [] + self.console._work_q.put(wi) + self.console._work_q_put = True + + def fetch(self, timeout=None): + """ + Get one data item from a mailbox, with timeout. + Invoked by application thread. + """ + self.cv.acquire() + try: + if len(self.data) == 0: + self.cv.wait(timeout) + if len(self.data): + return self.data.pop(0) + return None + finally: + self.cv.release() + + def expire(self): + """ The subscription expired. + """ + self.destroy() + + + + + ##============================================================================== ## DATA MODEL ##============================================================================== @@ -481,8 +599,8 @@ class QmfConsoleData(QmfData): logging.debug("Agent method req wait timed-out.") return None - _map = replyMsg.content.get(MsgKey.method) - if not _map: + _map = replyMsg.content + if not _map or not isinstance(_map, type({})): logging.error("Invalid method call reply message") return None @@ -650,8 +768,8 @@ class Agent(object): logging.debug("Agent method req wait timed-out.") return None - _map = replyMsg.content.get(MsgKey.method) - if not _map: + _map = replyMsg.content + if not _map or not isinstance(_map, type({})): logging.error("Invalid method call reply message") return None @@ -676,20 +794,66 @@ class Agent(object): def _send_query(self, query, correlation_id=None): """ """ - msg = Message(properties={"method":"request", - "qmf.subject":make_subject(OpCode.get_query)}, - content={MsgKey.query: query.map_encode()}) + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.query_req}, + content=query.map_encode()) self._send_msg( msg, correlation_id ) def _send_method_req(self, mr_map, correlation_id=None): """ """ - msg = Message(properties={"method":"request", - "qmf.subject":make_subject(OpCode.method_req)}, + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.method_req}, content=mr_map) self._send_msg( msg, correlation_id ) + def _send_subscribe_req(self, query, correlation_id, _interval=None, + _lifetime=None): + """ + """ + sr_map = {"_query":query.map_encode()} + if _interval is not None: + sr_map["_interval"] = _interval + if _lifetime is not None: + sr_map["_duration"] = _lifetime + + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.subscribe_req}, + content=sr_map) + self._send_msg(msg, correlation_id) + + + def _send_resubscribe_req(self, correlation_id, + subscription_id, + _lifetime=None): + """ + """ + sr_map = {"_subscription_id":subscription_id} + if _lifetime is not None: + sr_map["_duration"] = _lifetime + + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.subscribe_refresh_req}, + content=sr_map) + self._send_msg(msg, correlation_id) + + + def _send_unsubscribe_ind(self, correlation_id, subscription_id): + """ + """ + sr_map = {"_subscription_id":subscription_id} + + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.subscribe_cancel_ind}, + content=sr_map) + self._send_msg(msg, correlation_id) + ##============================================================================== ## METHOD CALL @@ -716,6 +880,36 @@ class MethodResult(object): return arg + + ##============================================================================== + ## SUBSCRIPTION + ##============================================================================== + +class SubscribeParams(object): + """ Represents a standing subscription for this console. + """ + def __init__(self, sid, interval, duration, _error=None): + self._sid = sid + self._interval = interval + self._duration = duration + self._error = _error + + def succeeded(self): + return self._error is None + + def get_error(self): + return self._error + + def get_subscription_id(self): + return self._sid + + def get_publish_interval(self): + return self._interval + + def get_duration(self): + return self._duration + + ##============================================================================== ## CONSOLE ##============================================================================== @@ -753,7 +947,7 @@ class Console(Thread): self._domain = _domain self._address = QmfAddress.direct(self._name, self._domain) self._notifier = notifier - self._lock = Lock() + self._lock = RLock() self._conn = None self._session = None # dict of "agent-direct-address":class Agent entries @@ -766,6 +960,7 @@ class Console(Thread): self._agent_discovery_filter = None self._reply_timeout = reply_timeout self._agent_timeout = agent_timeout + self._subscribe_timeout = 300 # @todo: parameterize self._next_agent_expire = None self._next_mbox_expire = None # for passing WorkItems to the application @@ -776,18 +971,6 @@ class Console(Thread): self._post_office = {} # indexed by cid self._async_mboxes = {} # indexed by cid, used to expire them - ## Old stuff below??? - #self._broker_list = [] - #self.impl = qmfengine.Console() - #self._event = qmfengine.ConsoleEvent() - ##self._cv = Condition() - ##self._sync_count = 0 - ##self._sync_result = None - ##self._select = {} - ##self._cb_cond = Condition() - - - def destroy(self, timeout=None): """ Must be called before the Console is deleted. @@ -801,8 +984,6 @@ class Console(Thread): self.remove_connection(self._conn, timeout) logging.debug("Console Destroyed") - - def add_connection(self, conn): """ Add a AMQP connection to the console. The console will setup a session over the @@ -934,10 +1115,11 @@ class Console(Thread): cid = mbox.get_address() query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) - msg = Message(subject="console.ind.locate." + name, + msg = Message(id=QMF_APP_ID, + subject="console.ind.locate." + name, properties={"method":"request", - "qmf.subject":make_subject(OpCode.agent_locate)}, - content={MsgKey.query: query.map_encode()}) + "qmf.opcode":OpCode.agent_locate_req}, + content=query._predicate) msg.reply_to = str(self._address) msg.correlation_id = str(cid) logging.debug("Sending Agent Locate (%s)" % time.time()) @@ -995,23 +1177,13 @@ class Console(Thread): def do_query(self, agent, query, _reply_handle=None, _timeout=None ): """ """ - query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info, - QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id, - QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id, - QmfQuery.TARGET_SCHEMA: MsgKey.schema, - QmfQuery.TARGET_OBJECT: MsgKey.data_obj, - QmfQuery.TARGET_AGENT: MsgKey.agent_info} - target = query.get_target() - msgkey = query_keymap.get(target) - if not msgkey: - raise Exception("Invalid target for query: %s" % str(query)) if _reply_handle is not None: mbox = _QueryMailbox(self, agent.get_name(), _reply_handle, - target, msgkey, + target, _timeout) else: mbox = _SyncMailbox(self) @@ -1045,9 +1217,8 @@ class Console(Thread): logging.debug("Query wait timed-out.") break - objects = reply.content.get(msgkey) - if not objects: - # last response is empty + objects = reply.content + if not objects or not isinstance(objects, type([])): break # convert from map to native types if needed @@ -1081,21 +1252,154 @@ class Console(Thread): # no conversion needed. response += objects + if not "partial" in reply.properties: + # reply not broken up over multiple msgs + break + now = datetime.datetime.utcnow() mbox.destroy() return response + + def create_subscription(self, agent, query, console_handle, + _interval=None, _duration=None, + _reply_handle=None, _timeout=None): + if not _duration: + _duration = self._subscribe_timeout + + if _reply_handle is not None: + assert(False) # async TBD + else: + mbox = _SubscriptionMailbox(self, _duration, console_handle, agent) + + cid = mbox.get_address() + + try: + logging.debug("Sending Subscribe to Agent (%s)" % time.time()) + agent._send_subscribe_req(query, cid, _interval, _duration) + except SendError, e: + logging.error(str(e)) + mbox.destroy() + return None + + if _reply_handle is not None: + return True + + # wait for reply + if _timeout is None: + _timeout = self._reply_timeout + + logging.debug("Waiting for response to subscription (%s)" % _timeout) + # @todo: what if mbox expires here? + replyMsg = mbox.fetch(_timeout) + + if not replyMsg: + logging.debug("Subscription request wait timed-out.") + mbox.destroy() + return None + + error = replyMsg.content.get("_error") + if error: + mbox.destroy() + try: + e_map = QmfData.from_map(error) + except TypeError: + e_map = QmfData.create({"error":"Unknown error"}) + return SubscribeParams(None, None, None, e_map) + + mbox.agent_subscription_id = replyMsg.content.get("_subscription_id") + return SubscribeParams(mbox.get_address(), + replyMsg.content.get("_interval"), + replyMsg.content.get("_duration"), + None) + + def refresh_subscription(self, subscription_id, + _duration=None, + _reply_handle=None, _timeout=None): + if _reply_handle is not None: + assert(False) # async TBD + + mbox = self._get_mailbox(subscription_id) + if not mbox: + logging.warning("Subscription %s not found." % subscription_id) + return None + + agent = self.get_agent(mbox.agent_name) + if not agent: + logging.warning("Subscription %s agent %s not found." % + (mbox.agent_name, subscription_id)) + return None + + try: + logging.debug("Sending Subscribe to Agent (%s)" % time.time()) + agent._send_resubscribe_req(subscription_id, + mbox.agent_subscription_id, + _duration) + except SendError, e: + logging.error(str(e)) + # @todo ???? mbox.destroy() + return None + + if _reply_handle is not None: + return True + + # wait for reply + if _timeout is None: + _timeout = self._reply_timeout + + logging.debug("Waiting for response to subscription (%s)" % _timeout) + replyMsg = mbox.fetch(_timeout) + + if not replyMsg: + logging.debug("Subscription request wait timed-out.") + # @todo???? mbox.destroy() + return None + + error = replyMsg.content.get("_error") + if error: + # @todo mbox.destroy() + try: + e_map = QmfData.from_map(error) + except TypeError: + e_map = QmfData.create({"error":"Unknown error"}) + return SubscribeParams(None, None, None, e_map) + + return SubscribeParams(mbox.get_address(), + replyMsg.content.get("_interval"), + replyMsg.content.get("_duration"), + None) + + def cancel_subscription(self, subscription_id): + """ + """ + mbox = self._get_mailbox(subscription_id) + if not mbox: + return None + + agent = self.get_agent(mbox.agent_name) + if agent: + try: + logging.debug("Sending UnSubscribe to Agent (%s)" % time.time()) + agent._send_unsubscribe_ind(subscription_id, + mbox.agent_subscription_id) + except SendError, e: + logging.error(str(e)) + + mbox.destroy() + + def _wake_thread(self): """ Make the console management thread loop wakeup from its next_receiver sleep. """ logging.debug("Sending noop to wake up [%s]" % self._address) - msg = Message(properties={"method":"request", - "qmf.subject":make_subject(OpCode.noop)}, + msg = Message(id=QMF_APP_ID, subject=self._name, - content={"noop":"noop"}) + properties={"method":"request", + "qmf.opcode":OpCode.noop}, + content={}) try: self._direct_sender.send( msg, sync=True ) except SendError, e: @@ -1152,9 +1456,17 @@ class Console(Thread): # to expire, or a mailbox requrest to time out now = datetime.datetime.utcnow() next_expire = self._next_agent_expire - if (self._next_mbox_expire and - self._next_mbox_expire < next_expire): - next_expire = self._next_mbox_expire + + # the mailbox expire flag may be cleared by the + # app thread(s) + self._lock.acquire() + try: + if (self._next_mbox_expire and + self._next_mbox_expire < next_expire): + next_expire = self._next_mbox_expire + finally: + self._lock.release() + if next_expire > now: timeout = timedelta_to_secs(next_expire - now) try: @@ -1268,13 +1580,14 @@ class Console(Thread): """ PRIVATE: Process a message received from an Agent """ - logging.debug( "Message received from Agent! [%s]" % msg ) - try: - version,opcode = parse_subject(msg.properties.get("qmf.subject")) - # @todo: deal with version mismatch!!! - except: + #logging.debug( "Message received from Agent! [%s]" % msg ) + #logging.error( "Message received from Agent! [%s]" % msg ) + + opcode = msg.properties.get("qmf.opcode") + if not opcode: logging.error("Ignoring unrecognized message '%s'" % msg) return + version = 2 # @todo: fix me cmap = {}; props = {} if msg.content_type == "amqp/map": @@ -1282,20 +1595,21 @@ class Console(Thread): if msg.properties: props = msg.properties - if opcode == OpCode.agent_ind: + if opcode == OpCode.agent_heartbeat_ind: self._handle_agent_ind_msg( msg, cmap, version, _direct ) - elif opcode == OpCode.data_ind: - self._handle_data_ind_msg(msg, cmap, version, _direct) - elif opcode == OpCode.event_ind: - self._handle_event_ind_msg(msg, cmap, version, _direct) - elif opcode == OpCode.managed_object: - logging.warning("!!! managed_object TBD !!!") - elif opcode == OpCode.object_ind: - logging.warning("!!! object_ind TBD !!!") - elif opcode == OpCode.response: + elif opcode == OpCode.agent_locate_rsp: + self._handle_agent_ind_msg( msg, cmap, version, _direct ) + elif opcode == OpCode.query_rsp: self._handle_response_msg(msg, cmap, version, _direct) - elif opcode == OpCode.schema_ind: - logging.warning("!!! schema_ind TBD !!!") + elif opcode == OpCode.subscribe_rsp: + self._handle_response_msg(msg, cmap, version, _direct) + elif opcode == OpCode.method_rsp: + self._handle_response_msg(msg, cmap, version, _direct) + elif opcode == OpCode.data_ind: + if msg.correlation_id: + self._handle_response_msg(msg, cmap, version, _direct) + else: + self._handle_indication_msg(msg, cmap, version, _direct) elif opcode == OpCode.noop: logging.debug("No-op msg received.") else: @@ -1309,7 +1623,7 @@ class Console(Thread): """ logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time())) - ai_map = cmap.get(MsgKey.agent_info) + ai_map = msg.content if not ai_map or not isinstance(ai_map, type({})): logging.warning("Bad agent-ind message received: '%s'" % msg) return @@ -1359,29 +1673,10 @@ class Console(Thread): logging.debug("waking waiters for correlation id %s" % msg.correlation_id) mbox.deliver(msg) - def _handle_data_ind_msg(self, msg, cmap, version, direct): - """ - Process a received data-ind message. - """ - logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time())) - - mbox = self._get_mailbox(msg.correlation_id) - if not mbox: - logging.debug("Data indicate received with unknown correlation_id" - " msg='%s'" % str(msg)) - return - - # wake up all waiters - logging.debug("waking waiters for correlation id %s" % - msg.correlation_id) - mbox.deliver(msg) - - def _handle_response_msg(self, msg, cmap, version, direct): """ Process a received data-ind message. """ - # @todo code replication - clean me. logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time())) mbox = self._get_mailbox(msg.correlation_id) @@ -1394,19 +1689,22 @@ class Console(Thread): logging.debug("waking waiters for correlation id %s" % msg.correlation_id) mbox.deliver(msg) - def _handle_event_ind_msg(self, msg, cmap, version, _direct): - ei_map = cmap.get(MsgKey.event) - if not ei_map or not isinstance(ei_map, type({})): - logging.warning("Bad event indication message received: '%s'" % msg) - return + def _handle_indication_msg(self, msg, cmap, version, _direct): - aname = ei_map.get("_name") - emap = ei_map.get("_event") + aname = msg.properties.get("qmf.agent") if not aname: - logging.debug("No '_name' field in event indication message.") + logging.debug("No agent name field in indication message.") return - if not emap: - logging.debug("No '_event' field in event indication message.") + + content_type = msg.properties.get("qmf.content") + if (content_type != ContentType.event or + not isinstance(msg.content, type([]))): + logging.warning("Bad event indication message received: '%s'" % msg) + return + + emap = msg.content[0] + if not isinstance(emap, type({})): + logging.debug("Invalid event body in indication message: '%s'" % msg) return agent = None @@ -1439,13 +1737,13 @@ class Console(Thread): """ Check all async mailboxes for outstanding requests that have expired. """ - now = datetime.datetime.utcnow() - if self._next_mbox_expire and now < self._next_mbox_expire: - return - expired_mboxes = [] - self._next_mbox_expire = None self._lock.acquire() try: + now = datetime.datetime.utcnow() + if self._next_mbox_expire and now < self._next_mbox_expire: + return + expired_mboxes = [] + self._next_mbox_expire = None for mbox in self._async_mboxes.itervalues(): if now >= mbox.expiration_date: expired_mboxes.append(mbox) diff --git a/qpid/extras/qmf/src/py/qmf2/tests/__init__.py b/qpid/extras/qmf/src/py/qmf2/tests/__init__.py index 186f09349e..eff9357e1f 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/__init__.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/__init__.py @@ -27,3 +27,4 @@ import events import multi_response import async_query import async_method +import subscriptions diff --git a/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py b/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py index 59b65221e0..0e5e595695 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py @@ -52,8 +52,8 @@ class _agentApp(Thread): self.broker_url = broker_url self.notifier = _testNotifier() self.agent = qmf2.agent.Agent(name, - _notifier=self.notifier, - _heartbeat_interval=heartbeat) + _notifier=self.notifier, + heartbeat_interval=heartbeat) # No database needed for this test self.running = False self.ready = Event() diff --git a/qpid/extras/qmf/src/py/qmf2/tests/async_method.py b/qpid/extras/qmf/src/py/qmf2/tests/async_method.py index 556b62756f..965a254f26 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/async_method.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/async_method.py @@ -53,7 +53,7 @@ class _agentApp(Thread): self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat) + heartbeat_interval=heartbeat) # Dynamically construct a management database diff --git a/qpid/extras/qmf/src/py/qmf2/tests/async_query.py b/qpid/extras/qmf/src/py/qmf2/tests/async_query.py index 3a9a767bf0..7c7b22fdaf 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/async_query.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/async_query.py @@ -53,7 +53,7 @@ class _agentApp(Thread): self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat) + heartbeat_interval=heartbeat) # Dynamically construct a management database diff --git a/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py b/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py index be2bdff9ab..22accb7cfc 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py @@ -53,7 +53,7 @@ class _agentApp(Thread): self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat) + heartbeat_interval=heartbeat) # Dynamically construct a management database diff --git a/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py b/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py index dd321cb4bb..be67c36d87 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py @@ -53,7 +53,7 @@ class _agentApp(Thread): self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat) + heartbeat_interval=heartbeat) # Dynamically construct a management database diff --git a/qpid/extras/qmf/src/py/qmf2/tests/events.py b/qpid/extras/qmf/src/py/qmf2/tests/events.py index e55dc8572e..bc6465f25b 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/events.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/events.py @@ -58,7 +58,7 @@ class _agentApp(Thread): self.notifier = _testNotifier() self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat) + heartbeat_interval=heartbeat) # Dynamically construct a management database diff --git a/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py b/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py index d3d00a70c5..7c24435e79 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py @@ -60,8 +60,8 @@ class _agentApp(Thread): self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat, - _max_msg_size=_MAX_OBJS_PER_MSG) + heartbeat_interval=heartbeat, + max_msg_size=_MAX_OBJS_PER_MSG) # Dynamically construct a management database for i in range(self.schema_count): diff --git a/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py b/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py index 5b1446bb3a..466457d670 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py @@ -54,7 +54,7 @@ class _agentApp(Thread): self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat) + heartbeat_interval=heartbeat) # Management Database # - two different schema packages, diff --git a/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py new file mode 100644 index 0000000000..d82f7855ba --- /dev/null +++ b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py @@ -0,0 +1,446 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +import datetime +import time +from threading import Thread, Event + +import qpid.messaging +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, WorkItem) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, broker_url, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.broker_url = broker_url + self.agent = Agent(name, + _notifier=self.notifier, + heartbeat_interval=heartbeat, + max_duration=10, + default_duration=7, + min_duration=5, + min_interval=1, + default_interval=2) + + # Management Database + # - two different schema packages, + # - two classes within one schema package + # - multiple objects per schema package+class + # - two "undescribed" objects + + # "package1/class1" + + _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class1"), + _desc="A test data schema - one", + _object_id_names=["key"] ) + + _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32)) + + self.agent.register_object_class(_schema) + + _obj = QmfAgentData( self.agent, + _values={"key":"p1c1_key1"}, + _schema=_schema) + _obj.set_value("count1", 0) + _obj.set_value("count2", 0) + self.agent.add_object( _obj ) + + _obj = QmfAgentData( self.agent, + _values={"key":"p1c1_key2"}, + _schema=_schema ) + _obj.set_value("count1", 9) + _obj.set_value("count2", 10) + self.agent.add_object( _obj ) + + # "package1/class2" + + _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class2"), + _desc="A test data schema - two", + _object_id_names=["name"] ) + # add properties + _schema.add_property( "name", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "string1", SchemaProperty(qmfTypes.TYPE_LSTR)) + + self.agent.register_object_class(_schema) + + _obj = QmfAgentData( self.agent, + _values={"name":"p1c2_name1"}, + _schema=_schema ) + _obj.set_value("string1", "a data string") + self.agent.add_object( _obj ) + + _obj = QmfAgentData( self.agent, + _values={"name":"p1c2_name2"}, + _schema=_schema ) + _obj.set_value("string1", "another data string") + self.agent.add_object( _obj ) + + + # "package2/class1" + + _schema = SchemaObjectClass( _classId=SchemaClassId("package2", "class1"), + _desc="A test data schema - second package", + _object_id_names=["key"] ) + + _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "counter", SchemaProperty(qmfTypes.TYPE_UINT32)) + + self.agent.register_object_class(_schema) + + _obj = QmfAgentData( self.agent, + _values={"key":"p2c1_key1"}, + _schema=_schema ) + _obj.set_value("counter", 0) + self.agent.add_object( _obj ) + + _obj = QmfAgentData( self.agent, + _values={"key":"p2c1_key2"}, + _schema=_schema ) + _obj.set_value("counter", 2112) + self.agent.add_object( _obj ) + + + # add two "unstructured" objects to the Agent + + _obj = QmfAgentData(self.agent, _object_id="undesc-1") + _obj.set_value("field1", "a value") + _obj.set_value("field2", 2) + _obj.set_value("field3", {"a":1, "map":2, "value":3}) + _obj.set_value("field4", ["a", "list", "value"]) + self.agent.add_object(_obj) + + + _obj = QmfAgentData(self.agent, _object_id="undesc-2") + _obj.set_value("key-1", "a value") + _obj.set_value("key-2", 2) + self.agent.add_object(_obj) + + self.running = False + self.ready = Event() + + def start_app(self): + self.running = True + self.start() + self.ready.wait(10) + if not self.ready.is_set(): + raise Exception("Agent failed to connect to broker.") + + def stop_app(self): + self.running = False + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(self.broker_url.host, + self.broker_url.port, + self.broker_url.user, + self.broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + self.ready.set() + + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + if self.conn: + self.agent.remove_connection(10) + self.agent.destroy(10) + + +class BaseTest(unittest.TestCase): + agent_count = 5 + + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + self.agents = [] + for i in range(self.agent_count): + agent = _agentApp("agent-" + str(i), self.broker, 1) + agent.start_app() + self.agents.append(agent) + #print("!!!! STARTING TEST: %s" % datetime.datetime.utcnow()) + + def tearDown(self): + #print("!!!! STOPPING TEST: %s" % datetime.datetime.utcnow()) + for agent in self.agents: + if agent is not None: + agent.stop_app() + + + def test_sync_by_schema(self): + # create console + # find all agents + # subscribe to changes to any object in package1/class1 + # should succeed + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + subscriptions = [] + index = 0 + + # query to match all objects in schema package1/class1 + sid = SchemaClassId.create("package1", "class1") + t_params = {QmfData.KEY_SCHEMA_ID: sid} + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, + _target_params=t_params) + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # now subscribe to agent + + sp = self.console.create_subscription(agent, + query, + index) + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + self.assertTrue(sp.get_duration() == 10) + self.assertTrue(sp.get_publish_interval() == 2) + + subscriptions.append([sp, 0]) + index += 1 + + # now wait for the duration + interval + 1 and count the updates + r_count = 0 + while self.notifier.wait_for_work(10 + 2 + 1): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 2) + for obj in reply: + self.assertTrue(isinstance(obj, QmfData)) + self.assertTrue(obj.get_object_id() == "p1c1_key2" or + obj.get_object_id() == "p1c1_key1") + sid = reply[0].get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package1") + self.assertTrue(sid.get_class_name() == "class1") + + self.assertTrue(wi.get_handle() < len(subscriptions)) + subscriptions[wi.get_handle()][1] += 1 + + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 5 publish per subscription + self.assertTrue(r_count == 5 * len(subscriptions)) + for ii in range(len(subscriptions)): + self.assertTrue(subscriptions[ii][1] == 5) + + self.console.destroy(10) + + + def test_sync_by_obj_id(self): + # create console + # find all agents + # subscribe to changes to any object in package1/class1 + # should succeed + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + subscriptions = [] + index = 0 + + # query to match all objects in schema package1/class1 + # sid = SchemaClassId.create("package1", "class1") + # t_params = {QmfData.KEY_SCHEMA_ID: sid} + query = QmfQuery.create_id_object("undesc-2") + + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # now subscribe to agent + + sp = self.console.create_subscription(agent, + query, + index) + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + + subscriptions.append([sp, 0]) + index += 1 + + # now wait for all subscriptions to expire (duration + interval + 1 for + # luck) + r_count = 0 + while self.notifier.wait_for_work(10 + 2 + 1): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], QmfData)) + self.assertTrue(reply[0].get_object_id() == "undesc-2") + # print("!!! get_params() = %s" % wi.get_params()) + self.assertTrue(wi.get_handle() < len(subscriptions)) + subscriptions[wi.get_handle()][1] += 1 + # self.assertTrue(isinstance(reply, qmf2.console.MethodResult)) + # self.assertTrue(reply.succeeded()) + # self.assertTrue(reply.get_argument("cookie") == + # wi.get_handle()) + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 5 publish per subscription + self.assertTrue(r_count == 5 * len(subscriptions)) + #for ii in range(len(subscriptions)): + # self.assertTrue(subscriptions[ii][1] == 5) + + self.console.destroy(10) + + + def test_sync_by_obj_id_schema(self): + # create console + # find all agents + # subscribe to changes to any object in package1/class1 + # should succeed + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + subscriptions = [] + index = 0 + + # query to match object "p2c1_key2" in schema package2/class1 + sid = SchemaClassId.create("package2", "class1") + query = QmfQuery.create_id_object("p2c1_key2", sid) + + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # now subscribe to agent + + sp = self.console.create_subscription(agent, + query, + index) + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + + subscriptions.append([sp, 0]) + index += 1 + + # now wait for all subscriptions to expire (duration + interval + 1 for + # luck) + r_count = 0 + while self.notifier.wait_for_work(10 + 2 + 1): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], QmfData)) + self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + sid = reply[0].get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_class_name() == "class1") + self.assertTrue(wi.get_handle() < len(subscriptions)) + subscriptions[wi.get_handle()][1] += 1 + # self.assertTrue(isinstance(reply, qmf2.console.MethodResult)) + # self.assertTrue(reply.succeeded()) + # self.assertTrue(reply.get_argument("cookie") == + # wi.get_handle()) + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 5 publish per subscription + self.assertTrue(r_count == 5 * len(subscriptions)) + #for ii in range(len(subscriptions)): + # self.assertTrue(subscriptions[ii][1] == 5) + + + + self.console.destroy(10) + + |