summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-02-24 19:40:31 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-02-24 19:40:31 +0000
commitdd248a04512bfcc14b4d9e3e1ee0e18cc66c568c (patch)
tree2336378209d61c90121d812076ba57e84488c72f
parent0e89a331de0d54b34de5bbe2531e3e48b272775f (diff)
downloadqpid-python-dd248a04512bfcc14b4d9e3e1ee0e18cc66c568c.tar.gz
QPID-2261: sync with msg formats defined on wiki, start subscription impl.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@915946 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/extras/qmf/src/py/qmf2/agent.py673
-rw-r--r--qpid/extras/qmf/src/py/qmf2/common.py83
-rw-r--r--qpid/extras/qmf/src/py/qmf2/console.py528
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/__init__.py1
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py4
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/async_method.py2
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/async_query.py2
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/basic_method.py2
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/basic_query.py2
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/events.py2
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/multi_response.py4
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py2
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py446
13 files changed, 1410 insertions, 341 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2/agent.py b/qpid/extras/qmf/src/py/qmf2/agent.py
index a6b3c39ad1..069cf184ce 100644
--- a/qpid/extras/qmf/src/py/qmf2/agent.py
+++ b/qpid/extras/qmf/src/py/qmf2/agent.py
@@ -21,18 +21,19 @@ import logging
import datetime
import time
import Queue
-from threading import Thread, Lock, currentThread, Event
+from threading import Thread, RLock, currentThread, Event
from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
-from common import (make_subject, parse_subject, OpCode, QmfQuery,
- SchemaObjectClass, MsgKey, QmfData, QmfAddress,
- SchemaClass, SchemaClassId, WorkItem, SchemaMethod,
- timedelta_to_secs)
+from common import (OpCode, QmfQuery, ContentType, SchemaObjectClass,
+ QmfData, QmfAddress, SchemaClass, SchemaClassId, WorkItem,
+ SchemaMethod, timedelta_to_secs, QMF_APP_ID)
# global flag that indicates which thread (if any) is
# running the agent notifier callback
_callback_thread=None
+
+
##==============================================================================
## METHOD CALL
##==============================================================================
@@ -78,14 +79,73 @@ class MethodCallParams(object):
return self._user_id
+ ##==============================================================================
+ ## SUBSCRIPTIONS
+ ##==============================================================================
+
+class _ConsoleHandle(object):
+ """
+ """
+ def __init__(self, handle, reply_to):
+ self.console_handle = handle
+ self.reply_to = reply_to
+
+class SubscriptionParams(object):
+ """
+ """
+ def __init__(self, console_handle, query, interval, duration, user_id):
+ self._console_handle = console_handle
+ self._query = query
+ self._interval = interval
+ self._duration = duration
+ self._user_id = user_id
+
+ def get_console_handle(self):
+ return self._console_handle
+
+ def get_query(self):
+ return self._query
+
+ def get_interval(self):
+ return self._interval
+
+ def get_duration(self):
+ return self._duration
+
+ def get_user_id(self):
+ return self._user_id
+
+class _SubscriptionState(object):
+ """
+ An internally-managed subscription.
+ """
+ def __init__(self, reply_to, cid, query, interval, duration):
+ self.reply_to = reply_to
+ self.correlation_id = cid
+ self.query = query
+ self.interval = interval
+ self.duration = duration
+ now = datetime.datetime.utcnow()
+ self.next_update = now # do an immediate update
+ self.expiration = now + datetime.timedelta(seconds=duration)
+ self.id = 0
+
+ def resubscribe(self, now, _duration=None):
+ if _duration is not None:
+ self.duration = _duration
+ self.expiration = now + datetime.timedelta(seconds=self.duration)
+
+ def reset_interval(self, now):
+ self.next_update = now + datetime.timedelta(seconds=self.interval)
+
+
##==============================================================================
## AGENT
##==============================================================================
class Agent(Thread):
- def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30,
- _max_msg_size=0, _capacity=10):
+ def __init__(self, name, _domain=None, _notifier=None, **options):
Thread.__init__(self)
self._running = False
self._ready = Event()
@@ -94,11 +154,20 @@ class Agent(Thread):
self._domain = _domain
self._address = QmfAddress.direct(self.name, self._domain)
self._notifier = _notifier
- self._heartbeat_interval = _heartbeat_interval
+
+ # configurable parameters
+ #
+ self._heartbeat_interval = options.get("heartbeat_interval", 30)
+ self._capacity = options.get("capacity", 10)
+ self._default_duration = options.get("default_duration", 300)
+ self._max_duration = options.get("max_duration", 3600)
+ self._min_duration = options.get("min_duration", 10)
+ self._default_interval = options.get("default_interval", 30)
+ self._min_interval = options.get("min_interval", 5)
+
# @todo: currently, max # of objects in a single reply message, would
# be better if it were max bytesize of per-msg content...
- self._max_msg_size = _max_msg_size
- self._capacity = _capacity
+ self._max_msg_size = options.get("max_msg_size", 0)
self._conn = None
self._session = None
@@ -107,7 +176,7 @@ class Agent(Thread):
self._direct_sender = None
self._topic_sender = None
- self._lock = Lock()
+ self._lock = RLock()
self._packages = {}
self._schema_timestamp = long(0)
self._schema = {}
@@ -119,6 +188,10 @@ class Agent(Thread):
self._undescribed_data = {}
self._work_q = Queue.Queue()
self._work_q_put = False
+ # subscriptions
+ self._subscription_id = long(time.time())
+ self._subscriptions = {}
+ self._next_subscribe_event = None
def destroy(self, timeout=None):
@@ -192,10 +265,11 @@ class Agent(Thread):
if self.isAlive():
# kick my thread to wake it up
try:
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.noop)},
+ msg = Message(id=QMF_APP_ID,
subject=self.name,
- content={"noop":"noop"})
+ properties={ "method":"request",
+ "qmf.opcode":OpCode.noop},
+ content={})
# TRACE
#logging.error("!!! sending wakeup to myself: %s" % msg)
@@ -258,13 +332,14 @@ class Agent(Thread):
raise Exception("No connection available")
# @todo: should we validate against the schema?
- _map = {"_name": self.get_name(),
- "_event": qmfEvent.map_encode()}
- msg = Message(subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
+ msg = Message(id=QMF_APP_ID,
+ subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
qmfEvent.get_severity() + "." + self.name,
- properties={"method":"response",
- "qmf.subject":make_subject(OpCode.event_ind)},
- content={MsgKey.event:_map})
+ properties={"method":"indication",
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content": ContentType.event,
+ "qmf.agent":self.name},
+ content=[qmfEvent.map_encode()])
# TRACE
# logging.error("!!! Agent %s sending Event (%s)" %
# (self.name, str(msg)))
@@ -330,9 +405,10 @@ class Agent(Thread):
raise TypeError("Invalid type for error - must be QmfData")
_map[SchemaMethod.KEY_ERROR] = _error.map_encode()
- msg = Message( properties={"method":"response",
- "qmf.subject":make_subject(OpCode.response)},
- content={MsgKey.method:_map})
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.method_rsp},
+ content=_map)
msg.correlation_id = handle.correlation_id
self._send_reply(msg, handle.reply_to)
@@ -370,25 +446,10 @@ class Agent(Thread):
while self._running:
- now = datetime.datetime.utcnow()
- # print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
- if now >= next_heartbeat:
- ind = self._makeAgentIndMsg()
- ind.subject = QmfAddress.SUBJECT_AGENT_HEARTBEAT
- # TRACE
- #logging.error("!!! Agent %s sending Heartbeat (%s)" %
- # (self.name, str(ind)))
- self._topic_sender.send(ind)
- logging.debug("Agent Indication Sent")
- next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
-
- timeout = timedelta_to_secs(next_heartbeat - now)
- # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
- try:
- self._session.next_receiver(timeout=timeout)
- except Empty:
- continue
-
+ #
+ # Process inbound messages
+ #
+ logging.debug("%s processing inbound messages..." % self.name)
for i in range(batch_limit):
try:
msg = self._topic_receiver.fetch(timeout=0)
@@ -409,7 +470,71 @@ class Agent(Thread):
# (self.name, self._direct_receiver.source, msg))
self._dispatch(msg, _direct=True)
+ #
+ # Send Heartbeat Notification
+ #
+ now = datetime.datetime.utcnow()
+ if now >= next_heartbeat:
+ logging.debug("%s sending heartbeat..." % self.name)
+ ind = Message(id=QMF_APP_ID,
+ subject=QmfAddress.SUBJECT_AGENT_HEARTBEAT,
+ properties={"method":"indication",
+ "qmf.opcode":OpCode.agent_heartbeat_ind,
+ "qmf.agent":self.name},
+ content=self._makeAgentInfoBody())
+ # TRACE
+ #logging.error("!!! Agent %s sending Heartbeat (%s)" %
+ # (self.name, str(ind)))
+ self._topic_sender.send(ind)
+ logging.debug("Agent Indication Sent")
+ next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
+
+
+
+ #
+ # Monitor Subscriptions
+ #
+ if (self._next_subscribe_event is None or
+ now >= self._next_subscribe_event):
+
+ logging.debug("%s polling subscriptions..." % self.name)
+ self._next_subscribe_event = now + datetime.timedelta(seconds=
+ self._max_duration)
+ self._lock.acquire()
+ try:
+ dead_ss = []
+ for sid,ss in self._subscriptions.iteritems():
+ if now >= ss.expiration:
+ dead_ss.append(sid)
+ continue
+ if now >= ss.next_update:
+ response = []
+ objs = self._queryData(ss.query)
+ if objs:
+ for obj in objs:
+ response.append(obj.map_encode())
+ logging.debug("!!! %s publishing %s!!!" % (self.name, ss.correlation_id))
+ self._send_query_response( ContentType.data,
+ ss.correlation_id,
+ ss.reply_to,
+ response)
+ ss.reset_interval(now)
+
+ next_timeout = min(ss.expiration, ss.next_update)
+ if next_timeout < self._next_subscribe_event:
+ self._next_subscribe_event = next_timeout
+
+ for sid in dead_ss:
+ del self._subscriptions[sid]
+ finally:
+ self._lock.release()
+
+ #
+ # notify application of pending WorkItems
+ #
+
if self._work_q_put and self._notifier:
+ logging.debug("%s notifying application..." % self.name)
# new stuff on work queue, kick the the application...
self._work_q_put = False
_callback_thread = currentThread()
@@ -417,19 +542,33 @@ class Agent(Thread):
self._notifier.indication()
_callback_thread = None
+ #
+ # Sleep until messages arrive or something times out
+ #
+ next_timeout = min(next_heartbeat, self._next_subscribe_event)
+ timeout = timedelta_to_secs(next_timeout -
+ datetime.datetime.utcnow())
+ if timeout > 0.0:
+ logging.debug("%s sleeping %s seconds..." % (self.name,
+ timeout))
+ try:
+ self._session.next_receiver(timeout=timeout)
+ except Empty:
+ pass
+
+
+
+
#
# Private:
#
- def _makeAgentIndMsg(self):
+ def _makeAgentInfoBody(self):
"""
- Create an agent indication message identifying this agent
+ Create an agent indication message body identifying this agent
"""
- _map = {"_name": self.get_name(),
+ return {"_name": self.get_name(),
"_schema_timestamp": self._schema_timestamp}
- return Message(properties={"method":"response",
- "qmf.subject":make_subject(OpCode.agent_ind)},
- content={MsgKey.agent_info: _map})
def _send_reply(self, msg, reply_to):
"""
@@ -458,7 +597,7 @@ class Agent(Thread):
except SendError, e:
logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
- def _send_query_response(self, subject, msgkey, cid, reply_to, objects):
+ def _send_query_response(self, content_type, cid, reply_to, objects):
"""
Send a response to a query, breaking the result into multiple
messages based on the agent's _max_msg_size config parameter
@@ -472,24 +611,28 @@ class Agent(Thread):
start = 0
end = min(total, max_count)
- while end <= total:
- m = Message(properties={"qmf.subject":subject,
- "method":"response"},
+ # send partial response if too many objects present
+ while end < total:
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "partial":None,
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content":content_type,
+ "qmf.agent":self.name},
correlation_id = cid,
- content={msgkey:objects[start:end]})
+ content=objects[start:end])
self._send_reply(m, reply_to)
- if end == total:
- break;
start = end
end = min(total, end + max_count)
- # response terminator - last message has empty object array
- if total:
- m = Message(properties={"qmf.subject":subject,
- "method":"response"},
- correlation_id = cid,
- content={msgkey: []} )
- self._send_reply(m, reply_to)
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content":content_type,
+ "qmf.agent":self.name},
+ correlation_id = cid,
+ content=objects[start:end])
+ self._send_reply(m, reply_to)
def _dispatch(self, msg, _direct=False):
"""
@@ -497,33 +640,32 @@ class Agent(Thread):
@param _direct: True if msg directly addressed to this agent.
"""
- logging.debug( "Message received from Console! [%s]" % msg )
- try:
- version,opcode = parse_subject(msg.properties.get("qmf.subject"))
- except:
- logging.warning("Ignoring unrecognized message '%s'" % msg.subject)
- return
+ # logging.debug( "Message received from Console! [%s]" % msg )
+ # logging.error( "%s Message received from Console! [%s]" % (self.name, msg) )
+ opcode = msg.properties.get("qmf.opcode")
+ if not opcode:
+ logging.warning("Ignoring unrecognized message '%s'" % msg)
+ return
+ version = 2 # @todo: fix me
cmap = {}; props={}
if msg.content_type == "amqp/map":
cmap = msg.content
if msg.properties:
props = msg.properties
- if opcode == OpCode.agent_locate:
+ if opcode == OpCode.agent_locate_req:
self._handleAgentLocateMsg( msg, cmap, props, version, _direct )
- elif opcode == OpCode.get_query:
+ elif opcode == OpCode.query_req:
self._handleQueryMsg( msg, cmap, props, version, _direct )
elif opcode == OpCode.method_req:
self._handleMethodReqMsg(msg, cmap, props, version, _direct)
- elif opcode == OpCode.cancel_subscription:
- logging.warning("!!! CANCEL_SUB TBD !!!")
- elif opcode == OpCode.create_subscription:
- logging.warning("!!! CREATE_SUB TBD !!!")
- elif opcode == OpCode.renew_subscription:
- logging.warning("!!! RENEW_SUB TBD !!!")
- elif opcode == OpCode.schema_query:
- logging.warning("!!! SCHEMA_QUERY TBD !!!")
+ elif opcode == OpCode.subscribe_req:
+ self._handleSubscribeReqMsg(msg, cmap, props, version, _direct)
+ elif opcode == OpCode.subscribe_refresh_req:
+ self._handleResubscribeReqMsg(msg, cmap, props, version, _direct)
+ elif opcode == OpCode.subscribe_cancel_ind:
+ self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct)
elif opcode == OpCode.noop:
logging.debug("No-op msg received.")
else:
@@ -536,18 +678,28 @@ class Agent(Thread):
"""
logging.debug("_handleAgentLocateMsg")
- reply = True
- if "method" in props and props["method"] == "request":
- query = cmap.get(MsgKey.query)
- if query is not None:
- # fake a QmfData containing my identifier for the query compare
- tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
- self.get_name()},
- _object_id="my-name")
- reply = QmfQuery.from_map(query).evaluate(tmpData)
+ reply = False
+ if props.get("method") == "request":
+ # if the message is addressed to me or wildcard, process it
+ if (msg.subject == "console.ind" or
+ msg.subject == "console.ind.locate" or
+ msg.subject == "console.ind.locate." + self.name):
+ pred = msg.content
+ if not pred:
+ reply = True
+ elif isinstance(pred, type([])):
+ # fake a QmfData containing my identifier for the query compare
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, pred)
+ tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
+ self.get_name()},
+ _object_id="my-name")
+ reply = query.evaluate(tmpData)
if reply:
- m = self._makeAgentIndMsg()
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.agent_locate_rsp},
+ content=self._makeAgentInfoBody())
m.correlation_id = msg.correlation_id
self._send_reply(m, msg.reply_to)
else:
@@ -561,22 +713,25 @@ class Agent(Thread):
logging.debug("_handleQueryMsg")
if "method" in props and props["method"] == "request":
- qmap = cmap.get(MsgKey.query)
- if qmap:
- query = QmfQuery.from_map(qmap)
+ if cmap:
+ try:
+ query = QmfQuery.from_map(cmap)
+ except TypeError:
+ logging.error("Invalid Query format: '%s'" % str(cmap))
+ return
target = query.get_target()
if target == QmfQuery.TARGET_PACKAGES:
- self._queryPackages( msg, query )
+ self._queryPackagesReply( msg, query )
elif target == QmfQuery.TARGET_SCHEMA_ID:
- self._querySchema( msg, query, _idOnly=True )
+ self._querySchemaReply( msg, query, _idOnly=True )
elif target == QmfQuery.TARGET_SCHEMA:
- self._querySchema( msg, query)
+ self._querySchemaReply( msg, query)
elif target == QmfQuery.TARGET_AGENT:
logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
elif target == QmfQuery.TARGET_OBJECT_ID:
- self._queryData(msg, query, _idOnly=True)
+ self._queryDataReply(msg, query, _idOnly=True)
elif target == QmfQuery.TARGET_OBJECT:
- self._queryData(msg, query)
+ self._queryDataReply(msg, query)
else:
logging.warning("Unrecognized query target: '%s'" % str(target))
@@ -634,7 +789,159 @@ class Agent(Thread):
self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
self._work_q_put = True
- def _queryPackages(self, msg, query):
+ def _handleSubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Subscription Request
+ """
+ if "method" in props and props["method"] == "request":
+ query_map = cmap.get("_query")
+ interval = cmap.get("_interval")
+ duration = cmap.get("_duration")
+
+ try:
+ query = QmfQuery.from_map(query_map)
+ except TypeError:
+ logging.warning("Invalid query for subscription: %s" %
+ str(query_map))
+ return
+
+ if isinstance(self, AgentExternal):
+ # param = SubscriptionParams(_ConsoleHandle(console_handle,
+ # msg.reply_to),
+ # query,
+ # interval,
+ # duration,
+ # msg.user_id)
+ # self._work_q.put(WorkItem(WorkItem.SUBSCRIBE_REQUEST,
+ # msg.correlation_id, param))
+ # self._work_q_put = True
+ logging.error("External Subscription TBD")
+ return
+
+ # validate the query - only specific objects, or
+ # objects wildcard, are currently supported.
+ if (query.get_target() != QmfQuery.TARGET_OBJECT or
+ (query.get_selector() == QmfQuery.PREDICATE and
+ query.get_predicate())):
+ logging.error("Subscriptions only support (wildcard) Object"
+ " Queries.")
+ err = QmfData.create(
+ {"reason": "Unsupported Query type for subscription.",
+ "query": str(query.map_encode())})
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_rsp},
+ correlation_id = msg.correlation_id,
+ content={"_error": err.map_encode()})
+ self._send_reply(m, msg.reply_to)
+ return
+
+ if duration is None:
+ duration = self._default_duration
+ else:
+ try:
+ duration = float(duration)
+ if duration > self._max_duration:
+ duration = self._max_duration
+ elif duration < self._min_duration:
+ duration = self._min_duration
+ except:
+ logging.warning("Bad duration value: %s" % str(msg))
+ duration = self._default_duration
+
+ if interval is None:
+ interval = self._default_interval
+ else:
+ try:
+ interval = float(interval)
+ if interval < self._min_interval:
+ interval = self._min_interval
+ except:
+ logging.warning("Bad interval value: %s" % str(msg))
+ interval = self._default_interval
+
+ ss = _SubscriptionState(msg.reply_to,
+ msg.correlation_id,
+ query,
+ interval,
+ duration)
+ self._lock.acquire()
+ try:
+ sid = self._subscription_id
+ self._subscription_id += 1
+ ss.id = sid
+ self._subscriptions[sid] = ss
+ self._next_subscribe_event = None
+ finally:
+ self._lock.release()
+
+ sr_map = {"_subscription_id": sid,
+ "_interval": interval,
+ "_duration": duration}
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_rsp},
+ correlation_id = msg.correlation_id,
+ content=sr_map)
+ self._send_reply(m, msg.reply_to)
+
+
+
+ def _handleResubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Renew Subscription Request
+ """
+ if props.get("method") == "request":
+ sid = cmap.get("_subscription_id")
+ if not sid:
+ logging.debug("Invalid subscription refresh msg: %s" %
+ str(msg))
+ return
+
+ self._lock.acquire()
+ try:
+ ss = self._subscriptions.get(sid)
+ if not ss:
+ logging.debug("Ignoring unknown subscription: %s" %
+ str(sid))
+ return
+ duration = cmap.get("_duration")
+ if duration is not None:
+ try:
+ duration = float(duration)
+ if duration > self._max_duration:
+ duration = self._max_duration
+ elif duration < self._min_duration:
+ duration = self._min_duration
+ except:
+ logging.debug("Bad duration value: %s" % str(msg))
+ duration = None # use existing duration
+
+ ss.resubscribe(datetime.datetime.utcnow(), duration)
+
+ finally:
+ self._lock.release()
+
+
+ def _handleUnsubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Cancel Subscription Request
+ """
+ if props.get("method") == "request":
+ sid = cmap.get("_subscription_id")
+ if not sid:
+ logging.warning("No subscription id supplied: %s" % msg)
+ return
+
+ self._lock.acquire()
+ try:
+ if sid in self._subscriptions:
+ del self._subscriptions[sid]
+ finally:
+ self._lock.release()
+
+
+ def _queryPackagesReply(self, msg, query):
"""
Run a query against the list of known packages
"""
@@ -646,58 +953,83 @@ class Agent(Thread):
_object_id="_package")
if query.evaluate(qmfData):
pnames.append(name)
+
+ self._send_query_response(ContentType.schema_package,
+ msg.correlation_id,
+ msg.reply_to,
+ pnames)
finally:
self._lock.release()
- self._send_query_response(make_subject(OpCode.data_ind),
- MsgKey.package_info,
- msg.correlation_id,
- msg.reply_to,
- pnames)
- def _querySchema( self, msg, query, _idOnly=False ):
+ def _querySchemaReply( self, msg, query, _idOnly=False ):
"""
"""
schemas = []
- # if querying for a specific schema, do a direct lookup
- if query.get_selector() == QmfQuery.ID:
- found = None
- self._lock.acquire()
- try:
+
+ self._lock.acquire()
+ try:
+ # if querying for a specific schema, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
found = self._schema.get(query.get_id())
- finally:
- self._lock.release()
- if found:
- if _idOnly:
- schemas.append(query.get_id().map_encode())
- else:
- schemas.append(found.map_encode())
- else: # otherwise, evaluate all schema
- self._lock.acquire()
- try:
+ if found:
+ if _idOnly:
+ schemas.append(query.get_id().map_encode())
+ else:
+ schemas.append(found.map_encode())
+ else: # otherwise, evaluate all schema
for sid,val in self._schema.iteritems():
if query.evaluate(val):
if _idOnly:
schemas.append(sid.map_encode())
else:
schemas.append(val.map_encode())
- finally:
- self._lock.release()
+ if _idOnly:
+ msgkey = ContentType.schema_id
+ else:
+ msgkey = ContentType.schema_class
- if _idOnly:
- msgkey = MsgKey.schema_id
- else:
- msgkey = MsgKey.schema
+ self._send_query_response(msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ schemas)
+ finally:
+ self._lock.release()
- self._send_query_response(make_subject(OpCode.data_ind),
- msgkey,
- msg.correlation_id,
- msg.reply_to,
- schemas)
+ def _queryDataReply( self, msg, query, _idOnly=False ):
+ """
+ """
+ # hold the (recursive) lock for the duration so the Agent
+ # won't send data that is currently being modified by the
+ # app.
+ self._lock.acquire()
+ try:
+ response = []
+ data_objs = self._queryData(query)
+ if _idOnly:
+ for obj in data_objs:
+ response.append(obj.get_object_id())
+ else:
+ for obj in data_objs:
+ response.append(obj.map_encode())
+
+ if _idOnly:
+ msgkey = ContentType.object_id
+ else:
+ msgkey = ContentType.data
- def _queryData( self, msg, query, _idOnly=False ):
+ self._send_query_response(msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ response)
+ finally:
+ self._lock.release()
+
+
+ def _queryData(self, query):
"""
+ Return a list of QmfData objects that match a given query
"""
data_objs = []
# extract optional schema_id from target params
@@ -705,12 +1037,12 @@ class Agent(Thread):
t_params = query.get_target_param()
if t_params:
sid = t_params.get(QmfData.KEY_SCHEMA_ID)
- # if querying for a specific object, do a direct lookup
- if query.get_selector() == QmfQuery.ID:
- oid = query.get_id()
- found = None
- self._lock.acquire()
- try:
+
+ self._lock.acquire()
+ try:
+ # if querying for a specific object, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
+ oid = query.get_id()
if sid and not sid.get_hash_string():
# wildcard schema_id match, check each schema
for name,db in self._described_data.iteritems():
@@ -718,11 +1050,9 @@ class Agent(Thread):
and name.get_package_name() == sid.get_package_name()):
found = db.get(oid)
if found:
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(found.map_encode())
+ data_objs.append(found)
else:
+ found = None
if sid:
db = self._described_data.get(sid)
if db:
@@ -730,15 +1060,9 @@ class Agent(Thread):
else:
found = self._undescribed_data.get(oid)
if found:
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(found.map_encode())
- finally:
- self._lock.release()
- else: # otherwise, evaluate all data
- self._lock.acquire()
- try:
+ data_objs.append(found)
+
+ else: # otherwise, evaluate all data
if sid and not sid.get_hash_string():
# wildcard schema_id match, check each schema
for name,db in self._described_data.iteritems():
@@ -746,10 +1070,7 @@ class Agent(Thread):
and name.get_package_name() == sid.get_package_name()):
for oid,data in db.iteritems():
if query.evaluate(data):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(data.map_encode())
+ data_objs.append(data)
else:
if sid:
db = self._described_data.get(sid)
@@ -759,23 +1080,28 @@ class Agent(Thread):
if db:
for oid,data in db.iteritems():
if query.evaluate(data):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(data.map_encode())
- finally:
- self._lock.release()
+ data_objs.append(data)
+ finally:
+ self._lock.release()
- if _idOnly:
- msgkey = MsgKey.object_id
- else:
- msgkey = MsgKey.data_obj
+ return data_objs
- self._send_query_response(make_subject(OpCode.data_ind),
- msgkey,
- msg.correlation_id,
- msg.reply_to,
- data_objs)
+
+
+ ##==============================================================================
+ ## EXTERNAL DATABASE AGENT
+ ##==============================================================================
+
+class AgentExternal(Agent):
+ """
+ An Agent which uses an external management database.
+ """
+ def __init__(self, name, _domain=None, _notifier=None,
+ _heartbeat_interval=30, _max_msg_size=0, _capacity=10):
+ super(AgentExternal, self).__init__(name, _domain, _notifier,
+ _heartbeat_interval,
+ _max_msg_size, _capacity)
+ logging.error("AgentExternal TBD")
@@ -823,9 +1149,11 @@ class QmfAgentData(QmfData):
_schema_id=schema_id, _const=False)
self._agent = agent
self._validated = False
+ self._modified = True
def destroy(self):
self._dtime = long(time.time() * 1000)
+ self._touch()
# @todo: publish change
def is_deleted(self):
@@ -833,6 +1161,7 @@ class QmfAgentData(QmfData):
def set_value(self, _name, _value, _subType=None):
super(QmfAgentData, self).set_value(_name, _value, _subType)
+ self._touch()
# @todo: publish change
def inc_value(self, name, delta=1):
@@ -849,6 +1178,7 @@ class QmfAgentData(QmfData):
""" subtract the delta from the property """
# @todo: need to take write-lock
logging.error(" TBD!!!")
+ self._touch()
def validate(self):
"""
@@ -868,6 +1198,13 @@ class QmfAgentData(QmfData):
raise Exception("Required property '%s' not present." % name)
self._validated = True
+ def _touch(self):
+ """
+ Mark this object as modified. Used to force a publish of this object
+ if on subscription.
+ """
+ self._modified = True
+
################################################################################
diff --git a/qpid/extras/qmf/src/py/qmf2/common.py b/qpid/extras/qmf/src/py/qmf2/common.py
index 8107b86666..8070add806 100644
--- a/qpid/extras/qmf/src/py/qmf2/common.py
+++ b/qpid/extras/qmf/src/py/qmf2/common.py
@@ -34,61 +34,44 @@ log_query = getLogger("qmf.query")
## Constants
##
-AMQP_QMF_SUBJECT = "qmf"
-AMQP_QMF_VERSION = 4
-AMQP_QMF_SUBJECT_FMT = "%s%d.%s"
-
-class MsgKey(object):
- agent_info = "agent_info"
- query = "query"
- package_info = "package_info"
- schema_id = "schema_id"
- schema = "schema"
- object_id="object_id"
- data_obj="object"
- method="method"
- event="event"
+QMF_APP_ID="qmf2"
-class OpCode(object):
- noop = "noop"
-
- # codes sent by a console and processed by the agent
- agent_locate = "agent-locate"
- cancel_subscription = "cancel-subscription"
- create_subscription = "create-subscription"
- get_query = "get-query"
- method_req = "method"
- renew_subscription = "renew-subscription"
- schema_query = "schema-query" # @todo: deprecate
-
- # codes sent by the agent to a console
- agent_ind = "agent"
- data_ind = "data"
- event_ind = "event"
- managed_object = "managed-object"
- object_ind = "object"
- response = "response"
- schema_ind="schema" # @todo: deprecate
+class ContentType(object):
+ """ Values for the 'qmf.content' message header
+ """
+ schema_package = "_schema_package"
+ schema_id = "_schema_id"
+ schema_class = "_schema_class"
+ object_id = "_object_id"
+ data = "_data"
+ event = "_event"
+class OpCode(object):
+ """ Values for the 'qmf.opcode' message header.
+ """
+ noop = "_noop"
+ # codes sent by a console and processed by the agent
+ agent_locate_req = "_agent_locate_request"
+ subscribe_req = "_subscribe_request"
+ subscribe_cancel_ind = "_subscribe_cancel_indication"
+ subscribe_refresh_req = "_subscribe_refresh_indication"
+ query_req = "_query_request"
+ method_req = "_method_request"
-def make_subject(_code):
- """
- Create a message subject field value.
- """
- return AMQP_QMF_SUBJECT_FMT % (AMQP_QMF_SUBJECT, AMQP_QMF_VERSION, _code)
+ # codes sent by the agent to a console
+ agent_locate_rsp = "_agent_locate_response"
+ agent_heartbeat_ind = "_agent_heartbeat_indication"
+ query_rsp = "_query_response"
+ subscribe_rsp = "_subscribe_response"
+ subscribe_refresh_rsp = "_subscribe_refresh_response"
+ data_ind = "_data_indication"
+ method_rsp = "_method_response"
-def parse_subject(_sub):
- """
- Deconstruct a subject field, return version,opcode values
- """
- if _sub[:3] != "qmf":
- raise Exception("Non-QMF message received")
- return _sub[3:].split('.', 1)
def timedelta_to_secs(td):
"""
@@ -133,11 +116,15 @@ class WorkItem(object):
AGENT_HEARTBEAT=8
QUERY_COMPLETE=9
METHOD_RESPONSE=10
+ SUBSCRIBE_RESPONSE=11
+ SUBSCRIBE_INDICATION=12
+ RESUBSCRIBE_RESPONSE=13
# Enumeration of the types of WorkItems produced on the Agent
METHOD_CALL=1000
QUERY=1001
- SUBSCRIBE=1002
- UNSUBSCRIBE=1003
+ SUBSCRIBE_REQUEST=1002
+ RESUBSCRIBE_REQUEST=1003
+ UNSUBSCRIBE_REQUEST=1004
def __init__(self, kind, handle, _params=None):
"""
diff --git a/qpid/extras/qmf/src/py/qmf2/console.py b/qpid/extras/qmf/src/py/qmf2/console.py
index c13cf70755..d8b625068e 100644
--- a/qpid/extras/qmf/src/py/qmf2/console.py
+++ b/qpid/extras/qmf/src/py/qmf2/console.py
@@ -24,14 +24,14 @@ import time
import datetime
import Queue
from threading import Thread, Event
-from threading import Lock
+from threading import RLock
from threading import currentThread
from threading import Condition
from qpid.messaging import Connection, Message, Empty, SendError
-from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier,
- MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId,
+from common import (QMF_APP_ID, OpCode, QmfQuery, Notifier, ContentType,
+ QmfData, QmfAddress, SchemaClass, SchemaClassId,
SchemaEventClass, SchemaObjectClass, WorkItem,
SchemaMethod, QmfEvent, timedelta_to_secs)
@@ -141,6 +141,7 @@ class _AsyncMailbox(_Mailbox):
console._lock.acquire()
try:
console._async_mboxes[self.cid] = self
+ console._next_mbox_expire = None
finally:
console._lock.release()
@@ -177,7 +178,7 @@ class _QueryMailbox(_AsyncMailbox):
def __init__(self, console,
agent_name,
context,
- target, msgkey,
+ target,
_timeout=None):
"""
Invoked by application thread.
@@ -186,7 +187,6 @@ class _QueryMailbox(_AsyncMailbox):
_timeout)
self.agent_name = agent_name
self.target = target
- self.msgkey = msgkey
self.context = context
self.result = []
@@ -195,11 +195,8 @@ class _QueryMailbox(_AsyncMailbox):
Process query response messages delivered to this mailbox.
Invoked by Console Management thread only.
"""
- done = False
- objects = reply.content.get(self.msgkey)
- if not objects:
- done = True
- else:
+ objects = reply.content
+ if isinstance(objects, type([])):
# convert from map to native types if needed
if self.target == QmfQuery.TARGET_SCHEMA_ID:
for sid_map in objects:
@@ -237,8 +234,7 @@ class _QueryMailbox(_AsyncMailbox):
# no conversion needed.
self.result += objects
- if done:
- # create workitem
+ if not "partial" in reply.properties:
# logging.error("QUERY COMPLETE for %s" % str(self.context))
wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
self.console._work_q.put(wi)
@@ -278,8 +274,8 @@ class _SchemaPrefetchMailbox(_AsyncMailbox):
Process schema response messages.
"""
done = False
- schemas = reply.content.get(MsgKey.schema)
- if schemas:
+ schemas = reply.content
+ if schemas and isinstance(schemas, type([])):
for schema_map in schemas:
# extract schema id, convert based on schema type
sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
@@ -319,8 +315,8 @@ class _MethodMailbox(_AsyncMailbox):
Invoked by Console Management thread only.
"""
- _map = reply.content.get(MsgKey.method)
- if not _map:
+ _map = reply.content
+ if not _map or not isinstance(_map, type({})):
logging.error("Invalid method call reply message")
result = None
else:
@@ -355,6 +351,128 @@ class _MethodMailbox(_AsyncMailbox):
+class _SubscriptionMailbox(_AsyncMailbox):
+ """
+ A Mailbox for a single subscription.
+ """
+ def __init__(self, console, lifetime, context, agent):
+ """
+ Invoked by application thread.
+ """
+ super(_SubscriptionMailbox, self).__init__(console, lifetime)
+ self.cv = Condition()
+ self.data = []
+ self.result = []
+ self.context = context
+ self.agent_name = agent.get_name()
+ self.agent_subscription_id = None # from agent
+
+ def deliver(self, msg):
+ """
+ """
+ opcode = msg.properties.get("qmf.opcode")
+ if (opcode == OpCode.subscribe_rsp or
+ opcode == OpCode.subscribe_refresh_rsp):
+ #
+ # sync only - just deliver the msg
+ #
+ self.cv.acquire()
+ try:
+ self.data.append(msg)
+ # if was empty, notify waiters
+ if len(self.data) == 1:
+ self.cv.notify()
+ finally:
+ self.cv.release()
+ return
+
+ # sid = msg.content.get("_subscription_id")
+ # lifetime = msg.content.get("_duration")
+ # error = msg.content.get("_error")
+ # sp = SubscribeParams(sid,
+ # msg.content.get("_interval"),
+ # lifetime, error)
+ # if sid and self.subscription_id is None:
+ # self.subscription_id = sid
+ # if lifetime:
+ # self.console._lock.acquire()
+ # try:
+ # self.expiration_date = (datetime.datetime.utcnow() +
+ # datetime.timedelta(seconds=lifetime))
+ # finally:
+ # self.console._lock.release()
+
+ # if self.waiting:
+ # self.cv.acquire()
+ # try:
+ # self.data.append(sp)
+ # # if was empty, notify waiters
+ # if len(self._data) == 1:
+ # self._cv.notify()
+ # finally:
+ # self._cv.release()
+ # else:
+ # if opcode == OpCode.subscribe_rsp:
+ # wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
+ # self.context, sp)
+ # else:
+ # wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
+ # self.context, sp)
+ # self.console._work_q.put(wi)
+ # self.console._work_q_put = True
+ # if error:
+ # self.destroy()
+
+ agent_name = msg.properties.get("qmf.agent")
+ if not agent_name:
+ logging.warning("Ignoring data_ind - no agent name given: %s" %
+ msg)
+ return
+ agent = self.console.get_agent(agent_name)
+ if not agent:
+ logging.warning("Ignoring data_ind - unknown agent '%s'" %
+ agent_name)
+ return
+
+ objects = msg.content
+ for obj_map in objects:
+ obj = QmfConsoleData(map_=obj_map, agent=agent)
+ # start fetch of schema if not known
+ sid = obj.get_schema_class_id()
+ if sid:
+ self.console._prefetch_schema(sid, agent)
+ self.result.append(obj)
+
+ if not "partial" in msg.properties:
+ wi = WorkItem(WorkItem.SUBSCRIBE_INDICATION, self.context, self.result)
+ self.result = []
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
+
+ def fetch(self, timeout=None):
+ """
+ Get one data item from a mailbox, with timeout.
+ Invoked by application thread.
+ """
+ self.cv.acquire()
+ try:
+ if len(self.data) == 0:
+ self.cv.wait(timeout)
+ if len(self.data):
+ return self.data.pop(0)
+ return None
+ finally:
+ self.cv.release()
+
+ def expire(self):
+ """ The subscription expired.
+ """
+ self.destroy()
+
+
+
+
+
##==============================================================================
## DATA MODEL
##==============================================================================
@@ -481,8 +599,8 @@ class QmfConsoleData(QmfData):
logging.debug("Agent method req wait timed-out.")
return None
- _map = replyMsg.content.get(MsgKey.method)
- if not _map:
+ _map = replyMsg.content
+ if not _map or not isinstance(_map, type({})):
logging.error("Invalid method call reply message")
return None
@@ -650,8 +768,8 @@ class Agent(object):
logging.debug("Agent method req wait timed-out.")
return None
- _map = replyMsg.content.get(MsgKey.method)
- if not _map:
+ _map = replyMsg.content
+ if not _map or not isinstance(_map, type({})):
logging.error("Invalid method call reply message")
return None
@@ -676,20 +794,66 @@ class Agent(object):
def _send_query(self, query, correlation_id=None):
"""
"""
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.get_query)},
- content={MsgKey.query: query.map_encode()})
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.query_req},
+ content=query.map_encode())
self._send_msg( msg, correlation_id )
def _send_method_req(self, mr_map, correlation_id=None):
"""
"""
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.method_req)},
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.method_req},
content=mr_map)
self._send_msg( msg, correlation_id )
+ def _send_subscribe_req(self, query, correlation_id, _interval=None,
+ _lifetime=None):
+ """
+ """
+ sr_map = {"_query":query.map_encode()}
+ if _interval is not None:
+ sr_map["_interval"] = _interval
+ if _lifetime is not None:
+ sr_map["_duration"] = _lifetime
+
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.subscribe_req},
+ content=sr_map)
+ self._send_msg(msg, correlation_id)
+
+
+ def _send_resubscribe_req(self, correlation_id,
+ subscription_id,
+ _lifetime=None):
+ """
+ """
+ sr_map = {"_subscription_id":subscription_id}
+ if _lifetime is not None:
+ sr_map["_duration"] = _lifetime
+
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.subscribe_refresh_req},
+ content=sr_map)
+ self._send_msg(msg, correlation_id)
+
+
+ def _send_unsubscribe_ind(self, correlation_id, subscription_id):
+ """
+ """
+ sr_map = {"_subscription_id":subscription_id}
+
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.subscribe_cancel_ind},
+ content=sr_map)
+ self._send_msg(msg, correlation_id)
+
##==============================================================================
## METHOD CALL
@@ -716,6 +880,36 @@ class MethodResult(object):
return arg
+
+ ##==============================================================================
+ ## SUBSCRIPTION
+ ##==============================================================================
+
+class SubscribeParams(object):
+ """ Represents a standing subscription for this console.
+ """
+ def __init__(self, sid, interval, duration, _error=None):
+ self._sid = sid
+ self._interval = interval
+ self._duration = duration
+ self._error = _error
+
+ def succeeded(self):
+ return self._error is None
+
+ def get_error(self):
+ return self._error
+
+ def get_subscription_id(self):
+ return self._sid
+
+ def get_publish_interval(self):
+ return self._interval
+
+ def get_duration(self):
+ return self._duration
+
+
##==============================================================================
## CONSOLE
##==============================================================================
@@ -753,7 +947,7 @@ class Console(Thread):
self._domain = _domain
self._address = QmfAddress.direct(self._name, self._domain)
self._notifier = notifier
- self._lock = Lock()
+ self._lock = RLock()
self._conn = None
self._session = None
# dict of "agent-direct-address":class Agent entries
@@ -766,6 +960,7 @@ class Console(Thread):
self._agent_discovery_filter = None
self._reply_timeout = reply_timeout
self._agent_timeout = agent_timeout
+ self._subscribe_timeout = 300 # @todo: parameterize
self._next_agent_expire = None
self._next_mbox_expire = None
# for passing WorkItems to the application
@@ -776,18 +971,6 @@ class Console(Thread):
self._post_office = {} # indexed by cid
self._async_mboxes = {} # indexed by cid, used to expire them
- ## Old stuff below???
- #self._broker_list = []
- #self.impl = qmfengine.Console()
- #self._event = qmfengine.ConsoleEvent()
- ##self._cv = Condition()
- ##self._sync_count = 0
- ##self._sync_result = None
- ##self._select = {}
- ##self._cb_cond = Condition()
-
-
-
def destroy(self, timeout=None):
"""
Must be called before the Console is deleted.
@@ -801,8 +984,6 @@ class Console(Thread):
self.remove_connection(self._conn, timeout)
logging.debug("Console Destroyed")
-
-
def add_connection(self, conn):
"""
Add a AMQP connection to the console. The console will setup a session over the
@@ -934,10 +1115,11 @@ class Console(Thread):
cid = mbox.get_address()
query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
- msg = Message(subject="console.ind.locate." + name,
+ msg = Message(id=QMF_APP_ID,
+ subject="console.ind.locate." + name,
properties={"method":"request",
- "qmf.subject":make_subject(OpCode.agent_locate)},
- content={MsgKey.query: query.map_encode()})
+ "qmf.opcode":OpCode.agent_locate_req},
+ content=query._predicate)
msg.reply_to = str(self._address)
msg.correlation_id = str(cid)
logging.debug("Sending Agent Locate (%s)" % time.time())
@@ -995,23 +1177,13 @@ class Console(Thread):
def do_query(self, agent, query, _reply_handle=None, _timeout=None ):
"""
"""
- query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
- QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
- QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
- QmfQuery.TARGET_SCHEMA: MsgKey.schema,
- QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
- QmfQuery.TARGET_AGENT: MsgKey.agent_info}
-
target = query.get_target()
- msgkey = query_keymap.get(target)
- if not msgkey:
- raise Exception("Invalid target for query: %s" % str(query))
if _reply_handle is not None:
mbox = _QueryMailbox(self,
agent.get_name(),
_reply_handle,
- target, msgkey,
+ target,
_timeout)
else:
mbox = _SyncMailbox(self)
@@ -1045,9 +1217,8 @@ class Console(Thread):
logging.debug("Query wait timed-out.")
break
- objects = reply.content.get(msgkey)
- if not objects:
- # last response is empty
+ objects = reply.content
+ if not objects or not isinstance(objects, type([])):
break
# convert from map to native types if needed
@@ -1081,21 +1252,154 @@ class Console(Thread):
# no conversion needed.
response += objects
+ if not "partial" in reply.properties:
+ # reply not broken up over multiple msgs
+ break
+
now = datetime.datetime.utcnow()
mbox.destroy()
return response
+
+ def create_subscription(self, agent, query, console_handle,
+ _interval=None, _duration=None,
+ _reply_handle=None, _timeout=None):
+ if not _duration:
+ _duration = self._subscribe_timeout
+
+ if _reply_handle is not None:
+ assert(False) # async TBD
+ else:
+ mbox = _SubscriptionMailbox(self, _duration, console_handle, agent)
+
+ cid = mbox.get_address()
+
+ try:
+ logging.debug("Sending Subscribe to Agent (%s)" % time.time())
+ agent._send_subscribe_req(query, cid, _interval, _duration)
+ except SendError, e:
+ logging.error(str(e))
+ mbox.destroy()
+ return None
+
+ if _reply_handle is not None:
+ return True
+
+ # wait for reply
+ if _timeout is None:
+ _timeout = self._reply_timeout
+
+ logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ # @todo: what if mbox expires here?
+ replyMsg = mbox.fetch(_timeout)
+
+ if not replyMsg:
+ logging.debug("Subscription request wait timed-out.")
+ mbox.destroy()
+ return None
+
+ error = replyMsg.content.get("_error")
+ if error:
+ mbox.destroy()
+ try:
+ e_map = QmfData.from_map(error)
+ except TypeError:
+ e_map = QmfData.create({"error":"Unknown error"})
+ return SubscribeParams(None, None, None, e_map)
+
+ mbox.agent_subscription_id = replyMsg.content.get("_subscription_id")
+ return SubscribeParams(mbox.get_address(),
+ replyMsg.content.get("_interval"),
+ replyMsg.content.get("_duration"),
+ None)
+
+ def refresh_subscription(self, subscription_id,
+ _duration=None,
+ _reply_handle=None, _timeout=None):
+ if _reply_handle is not None:
+ assert(False) # async TBD
+
+ mbox = self._get_mailbox(subscription_id)
+ if not mbox:
+ logging.warning("Subscription %s not found." % subscription_id)
+ return None
+
+ agent = self.get_agent(mbox.agent_name)
+ if not agent:
+ logging.warning("Subscription %s agent %s not found." %
+ (mbox.agent_name, subscription_id))
+ return None
+
+ try:
+ logging.debug("Sending Subscribe to Agent (%s)" % time.time())
+ agent._send_resubscribe_req(subscription_id,
+ mbox.agent_subscription_id,
+ _duration)
+ except SendError, e:
+ logging.error(str(e))
+ # @todo ???? mbox.destroy()
+ return None
+
+ if _reply_handle is not None:
+ return True
+
+ # wait for reply
+ if _timeout is None:
+ _timeout = self._reply_timeout
+
+ logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ replyMsg = mbox.fetch(_timeout)
+
+ if not replyMsg:
+ logging.debug("Subscription request wait timed-out.")
+ # @todo???? mbox.destroy()
+ return None
+
+ error = replyMsg.content.get("_error")
+ if error:
+ # @todo mbox.destroy()
+ try:
+ e_map = QmfData.from_map(error)
+ except TypeError:
+ e_map = QmfData.create({"error":"Unknown error"})
+ return SubscribeParams(None, None, None, e_map)
+
+ return SubscribeParams(mbox.get_address(),
+ replyMsg.content.get("_interval"),
+ replyMsg.content.get("_duration"),
+ None)
+
+ def cancel_subscription(self, subscription_id):
+ """
+ """
+ mbox = self._get_mailbox(subscription_id)
+ if not mbox:
+ return None
+
+ agent = self.get_agent(mbox.agent_name)
+ if agent:
+ try:
+ logging.debug("Sending UnSubscribe to Agent (%s)" % time.time())
+ agent._send_unsubscribe_ind(subscription_id,
+ mbox.agent_subscription_id)
+ except SendError, e:
+ logging.error(str(e))
+
+ mbox.destroy()
+
+
def _wake_thread(self):
"""
Make the console management thread loop wakeup from its next_receiver
sleep.
"""
logging.debug("Sending noop to wake up [%s]" % self._address)
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.noop)},
+ msg = Message(id=QMF_APP_ID,
subject=self._name,
- content={"noop":"noop"})
+ properties={"method":"request",
+ "qmf.opcode":OpCode.noop},
+ content={})
try:
self._direct_sender.send( msg, sync=True )
except SendError, e:
@@ -1152,9 +1456,17 @@ class Console(Thread):
# to expire, or a mailbox requrest to time out
now = datetime.datetime.utcnow()
next_expire = self._next_agent_expire
- if (self._next_mbox_expire and
- self._next_mbox_expire < next_expire):
- next_expire = self._next_mbox_expire
+
+ # the mailbox expire flag may be cleared by the
+ # app thread(s)
+ self._lock.acquire()
+ try:
+ if (self._next_mbox_expire and
+ self._next_mbox_expire < next_expire):
+ next_expire = self._next_mbox_expire
+ finally:
+ self._lock.release()
+
if next_expire > now:
timeout = timedelta_to_secs(next_expire - now)
try:
@@ -1268,13 +1580,14 @@ class Console(Thread):
"""
PRIVATE: Process a message received from an Agent
"""
- logging.debug( "Message received from Agent! [%s]" % msg )
- try:
- version,opcode = parse_subject(msg.properties.get("qmf.subject"))
- # @todo: deal with version mismatch!!!
- except:
+ #logging.debug( "Message received from Agent! [%s]" % msg )
+ #logging.error( "Message received from Agent! [%s]" % msg )
+
+ opcode = msg.properties.get("qmf.opcode")
+ if not opcode:
logging.error("Ignoring unrecognized message '%s'" % msg)
return
+ version = 2 # @todo: fix me
cmap = {}; props = {}
if msg.content_type == "amqp/map":
@@ -1282,20 +1595,21 @@ class Console(Thread):
if msg.properties:
props = msg.properties
- if opcode == OpCode.agent_ind:
+ if opcode == OpCode.agent_heartbeat_ind:
self._handle_agent_ind_msg( msg, cmap, version, _direct )
- elif opcode == OpCode.data_ind:
- self._handle_data_ind_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.event_ind:
- self._handle_event_ind_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.managed_object:
- logging.warning("!!! managed_object TBD !!!")
- elif opcode == OpCode.object_ind:
- logging.warning("!!! object_ind TBD !!!")
- elif opcode == OpCode.response:
+ elif opcode == OpCode.agent_locate_rsp:
+ self._handle_agent_ind_msg( msg, cmap, version, _direct )
+ elif opcode == OpCode.query_rsp:
self._handle_response_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.schema_ind:
- logging.warning("!!! schema_ind TBD !!!")
+ elif opcode == OpCode.subscribe_rsp:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ elif opcode == OpCode.method_rsp:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ elif opcode == OpCode.data_ind:
+ if msg.correlation_id:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ else:
+ self._handle_indication_msg(msg, cmap, version, _direct)
elif opcode == OpCode.noop:
logging.debug("No-op msg received.")
else:
@@ -1309,7 +1623,7 @@ class Console(Thread):
"""
logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time()))
- ai_map = cmap.get(MsgKey.agent_info)
+ ai_map = msg.content
if not ai_map or not isinstance(ai_map, type({})):
logging.warning("Bad agent-ind message received: '%s'" % msg)
return
@@ -1359,29 +1673,10 @@ class Console(Thread):
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
- def _handle_data_ind_msg(self, msg, cmap, version, direct):
- """
- Process a received data-ind message.
- """
- logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time()))
-
- mbox = self._get_mailbox(msg.correlation_id)
- if not mbox:
- logging.debug("Data indicate received with unknown correlation_id"
- " msg='%s'" % str(msg))
- return
-
- # wake up all waiters
- logging.debug("waking waiters for correlation id %s" %
- msg.correlation_id)
- mbox.deliver(msg)
-
-
def _handle_response_msg(self, msg, cmap, version, direct):
"""
Process a received data-ind message.
"""
- # @todo code replication - clean me.
logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
mbox = self._get_mailbox(msg.correlation_id)
@@ -1394,19 +1689,22 @@ class Console(Thread):
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
- def _handle_event_ind_msg(self, msg, cmap, version, _direct):
- ei_map = cmap.get(MsgKey.event)
- if not ei_map or not isinstance(ei_map, type({})):
- logging.warning("Bad event indication message received: '%s'" % msg)
- return
+ def _handle_indication_msg(self, msg, cmap, version, _direct):
- aname = ei_map.get("_name")
- emap = ei_map.get("_event")
+ aname = msg.properties.get("qmf.agent")
if not aname:
- logging.debug("No '_name' field in event indication message.")
+ logging.debug("No agent name field in indication message.")
return
- if not emap:
- logging.debug("No '_event' field in event indication message.")
+
+ content_type = msg.properties.get("qmf.content")
+ if (content_type != ContentType.event or
+ not isinstance(msg.content, type([]))):
+ logging.warning("Bad event indication message received: '%s'" % msg)
+ return
+
+ emap = msg.content[0]
+ if not isinstance(emap, type({})):
+ logging.debug("Invalid event body in indication message: '%s'" % msg)
return
agent = None
@@ -1439,13 +1737,13 @@ class Console(Thread):
"""
Check all async mailboxes for outstanding requests that have expired.
"""
- now = datetime.datetime.utcnow()
- if self._next_mbox_expire and now < self._next_mbox_expire:
- return
- expired_mboxes = []
- self._next_mbox_expire = None
self._lock.acquire()
try:
+ now = datetime.datetime.utcnow()
+ if self._next_mbox_expire and now < self._next_mbox_expire:
+ return
+ expired_mboxes = []
+ self._next_mbox_expire = None
for mbox in self._async_mboxes.itervalues():
if now >= mbox.expiration_date:
expired_mboxes.append(mbox)
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/__init__.py b/qpid/extras/qmf/src/py/qmf2/tests/__init__.py
index 186f09349e..eff9357e1f 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/__init__.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/__init__.py
@@ -27,3 +27,4 @@ import events
import multi_response
import async_query
import async_method
+import subscriptions
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py b/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py
index 59b65221e0..0e5e595695 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py
@@ -52,8 +52,8 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.notifier = _testNotifier()
self.agent = qmf2.agent.Agent(name,
- _notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ _notifier=self.notifier,
+ heartbeat_interval=heartbeat)
# No database needed for this test
self.running = False
self.ready = Event()
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/async_method.py b/qpid/extras/qmf/src/py/qmf2/tests/async_method.py
index 556b62756f..965a254f26 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/async_method.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/async_method.py
@@ -53,7 +53,7 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/async_query.py b/qpid/extras/qmf/src/py/qmf2/tests/async_query.py
index 3a9a767bf0..7c7b22fdaf 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/async_query.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/async_query.py
@@ -53,7 +53,7 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py b/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py
index be2bdff9ab..22accb7cfc 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py
@@ -53,7 +53,7 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py b/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py
index dd321cb4bb..be67c36d87 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py
@@ -53,7 +53,7 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/events.py b/qpid/extras/qmf/src/py/qmf2/tests/events.py
index e55dc8572e..bc6465f25b 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/events.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/events.py
@@ -58,7 +58,7 @@ class _agentApp(Thread):
self.notifier = _testNotifier()
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py b/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py
index d3d00a70c5..7c24435e79 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py
@@ -60,8 +60,8 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat,
- _max_msg_size=_MAX_OBJS_PER_MSG)
+ heartbeat_interval=heartbeat,
+ max_msg_size=_MAX_OBJS_PER_MSG)
# Dynamically construct a management database
for i in range(self.schema_count):
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py b/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py
index 5b1446bb3a..466457d670 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py
@@ -54,7 +54,7 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Management Database
# - two different schema packages,
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
new file mode 100644
index 0000000000..d82f7855ba
--- /dev/null
+++ b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
@@ -0,0 +1,446 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+import datetime
+import time
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+ SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+ QmfData, WorkItem)
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+ def __init__(self):
+ self._event = Event()
+
+ def indication(self):
+ # note: called by qmf daemon thread
+ self._event.set()
+
+ def wait_for_work(self, timeout):
+ # note: called by application thread to wait
+ # for qmf to generate work
+ self._event.wait(timeout)
+ timed_out = self._event.isSet() == False
+ if not timed_out:
+ self._event.clear()
+ return True
+ return False
+
+
+class _agentApp(Thread):
+ def __init__(self, name, broker_url, heartbeat):
+ Thread.__init__(self)
+ self.notifier = _testNotifier()
+ self.broker_url = broker_url
+ self.agent = Agent(name,
+ _notifier=self.notifier,
+ heartbeat_interval=heartbeat,
+ max_duration=10,
+ default_duration=7,
+ min_duration=5,
+ min_interval=1,
+ default_interval=2)
+
+ # Management Database
+ # - two different schema packages,
+ # - two classes within one schema package
+ # - multiple objects per schema package+class
+ # - two "undescribed" objects
+
+ # "package1/class1"
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class1"),
+ _desc="A test data schema - one",
+ _object_id_names=["key"] )
+
+ _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ self.agent.register_object_class(_schema)
+
+ _obj = QmfAgentData( self.agent,
+ _values={"key":"p1c1_key1"},
+ _schema=_schema)
+ _obj.set_value("count1", 0)
+ _obj.set_value("count2", 0)
+ self.agent.add_object( _obj )
+
+ _obj = QmfAgentData( self.agent,
+ _values={"key":"p1c1_key2"},
+ _schema=_schema )
+ _obj.set_value("count1", 9)
+ _obj.set_value("count2", 10)
+ self.agent.add_object( _obj )
+
+ # "package1/class2"
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class2"),
+ _desc="A test data schema - two",
+ _object_id_names=["name"] )
+ # add properties
+ _schema.add_property( "name", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "string1", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ self.agent.register_object_class(_schema)
+
+ _obj = QmfAgentData( self.agent,
+ _values={"name":"p1c2_name1"},
+ _schema=_schema )
+ _obj.set_value("string1", "a data string")
+ self.agent.add_object( _obj )
+
+ _obj = QmfAgentData( self.agent,
+ _values={"name":"p1c2_name2"},
+ _schema=_schema )
+ _obj.set_value("string1", "another data string")
+ self.agent.add_object( _obj )
+
+
+ # "package2/class1"
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("package2", "class1"),
+ _desc="A test data schema - second package",
+ _object_id_names=["key"] )
+
+ _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "counter", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ self.agent.register_object_class(_schema)
+
+ _obj = QmfAgentData( self.agent,
+ _values={"key":"p2c1_key1"},
+ _schema=_schema )
+ _obj.set_value("counter", 0)
+ self.agent.add_object( _obj )
+
+ _obj = QmfAgentData( self.agent,
+ _values={"key":"p2c1_key2"},
+ _schema=_schema )
+ _obj.set_value("counter", 2112)
+ self.agent.add_object( _obj )
+
+
+ # add two "unstructured" objects to the Agent
+
+ _obj = QmfAgentData(self.agent, _object_id="undesc-1")
+ _obj.set_value("field1", "a value")
+ _obj.set_value("field2", 2)
+ _obj.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj.set_value("field4", ["a", "list", "value"])
+ self.agent.add_object(_obj)
+
+
+ _obj = QmfAgentData(self.agent, _object_id="undesc-2")
+ _obj.set_value("key-1", "a value")
+ _obj.set_value("key-2", 2)
+ self.agent.add_object(_obj)
+
+ self.running = False
+ self.ready = Event()
+
+ def start_app(self):
+ self.running = True
+ self.start()
+ self.ready.wait(10)
+ if not self.ready.is_set():
+ raise Exception("Agent failed to connect to broker.")
+
+ def stop_app(self):
+ self.running = False
+ self.notifier.indication() # hmmm... collide with daemon???
+ self.join(10)
+ if self.isAlive():
+ raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+ def run(self):
+ # broker_url = "user/passwd@hostname:port"
+ self.conn = qpid.messaging.Connection(self.broker_url.host,
+ self.broker_url.port,
+ self.broker_url.user,
+ self.broker_url.password)
+ self.conn.connect()
+ self.agent.set_connection(self.conn)
+ self.ready.set()
+
+ while self.running:
+ self.notifier.wait_for_work(None)
+ wi = self.agent.get_next_workitem(timeout=0)
+ while wi is not None:
+ logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+ self.agent.release_workitem(wi)
+ wi = self.agent.get_next_workitem(timeout=0)
+
+ if self.conn:
+ self.agent.remove_connection(10)
+ self.agent.destroy(10)
+
+
+class BaseTest(unittest.TestCase):
+ agent_count = 5
+
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
+ def setUp(self):
+ self.agents = []
+ for i in range(self.agent_count):
+ agent = _agentApp("agent-" + str(i), self.broker, 1)
+ agent.start_app()
+ self.agents.append(agent)
+ #print("!!!! STARTING TEST: %s" % datetime.datetime.utcnow())
+
+ def tearDown(self):
+ #print("!!!! STOPPING TEST: %s" % datetime.datetime.utcnow())
+ for agent in self.agents:
+ if agent is not None:
+ agent.stop_app()
+
+
+ def test_sync_by_schema(self):
+ # create console
+ # find all agents
+ # subscribe to changes to any object in package1/class1
+ # should succeed
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ subscriptions = []
+ index = 0
+
+ # query to match all objects in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ t_params = {QmfData.KEY_SCHEMA_ID: sid}
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
+ _target_params=t_params)
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # now subscribe to agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ index)
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+ self.assertTrue(sp.get_duration() == 10)
+ self.assertTrue(sp.get_publish_interval() == 2)
+
+ subscriptions.append([sp, 0])
+ index += 1
+
+ # now wait for the duration + interval + 1 and count the updates
+ r_count = 0
+ while self.notifier.wait_for_work(10 + 2 + 1):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 2)
+ for obj in reply:
+ self.assertTrue(isinstance(obj, QmfData))
+ self.assertTrue(obj.get_object_id() == "p1c1_key2" or
+ obj.get_object_id() == "p1c1_key1")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package1")
+ self.assertTrue(sid.get_class_name() == "class1")
+
+ self.assertTrue(wi.get_handle() < len(subscriptions))
+ subscriptions[wi.get_handle()][1] += 1
+
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription
+ self.assertTrue(r_count == 5 * len(subscriptions))
+ for ii in range(len(subscriptions)):
+ self.assertTrue(subscriptions[ii][1] == 5)
+
+ self.console.destroy(10)
+
+
+ def test_sync_by_obj_id(self):
+ # create console
+ # find all agents
+ # subscribe to changes to any object in package1/class1
+ # should succeed
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ subscriptions = []
+ index = 0
+
+ # query to match all objects in schema package1/class1
+ # sid = SchemaClassId.create("package1", "class1")
+ # t_params = {QmfData.KEY_SCHEMA_ID: sid}
+ query = QmfQuery.create_id_object("undesc-2")
+
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # now subscribe to agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ index)
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+
+ subscriptions.append([sp, 0])
+ index += 1
+
+ # now wait for all subscriptions to expire (duration + interval + 1 for
+ # luck)
+ r_count = 0
+ while self.notifier.wait_for_work(10 + 2 + 1):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "undesc-2")
+ # print("!!! get_params() = %s" % wi.get_params())
+ self.assertTrue(wi.get_handle() < len(subscriptions))
+ subscriptions[wi.get_handle()][1] += 1
+ # self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
+ # self.assertTrue(reply.succeeded())
+ # self.assertTrue(reply.get_argument("cookie") ==
+ # wi.get_handle())
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription
+ self.assertTrue(r_count == 5 * len(subscriptions))
+ #for ii in range(len(subscriptions)):
+ # self.assertTrue(subscriptions[ii][1] == 5)
+
+ self.console.destroy(10)
+
+
+ def test_sync_by_obj_id_schema(self):
+ # create console
+ # find all agents
+ # subscribe to changes to any object in package1/class1
+ # should succeed
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ subscriptions = []
+ index = 0
+
+ # query to match object "p2c1_key2" in schema package2/class1
+ sid = SchemaClassId.create("package2", "class1")
+ query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # now subscribe to agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ index)
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+
+ subscriptions.append([sp, 0])
+ index += 1
+
+ # now wait for all subscriptions to expire (duration + interval + 1 for
+ # luck)
+ r_count = 0
+ while self.notifier.wait_for_work(10 + 2 + 1):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_class_name() == "class1")
+ self.assertTrue(wi.get_handle() < len(subscriptions))
+ subscriptions[wi.get_handle()][1] += 1
+ # self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
+ # self.assertTrue(reply.succeeded())
+ # self.assertTrue(reply.get_argument("cookie") ==
+ # wi.get_handle())
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription
+ self.assertTrue(r_count == 5 * len(subscriptions))
+ #for ii in range(len(subscriptions)):
+ # self.assertTrue(subscriptions[ii][1] == 5)
+
+
+
+ self.console.destroy(10)
+
+