summaryrefslogtreecommitdiff
path: root/python/qmf2/console.py
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-02-12 20:15:21 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-02-12 20:15:21 +0000
commit1827a69f01ea0a955161fd93edfa137d7b1723a4 (patch)
treeeda86ee8c8fb763c85038763c68d6892a14a1133 /python/qmf2/console.py
parentcccafbd87d6a4cf00b3511402075f7a597e3df9e (diff)
downloadqpid-python-1827a69f01ea0a955161fd93edfa137d7b1723a4.tar.gz
QPID-2261: add async query and schema prefetch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@909591 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r--python/qmf2/console.py439
1 files changed, 378 insertions, 61 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py
index 441b770d53..7b4bfe9fb8 100644
--- a/python/qmf2/console.py
+++ b/python/qmf2/console.py
@@ -56,21 +56,47 @@ class _Mailbox(object):
"""
Virtual base class for all Mailbox-like objects.
"""
+ def __init__(self, console):
+ self.console = console
+ self.cid = 0
+ self.console._add_mailbox(self)
+
+ def get_address(self):
+ return self.cid
+
def deliver(self, data):
+ """
+ Invoked by Console Management thread when a message arrives for
+ this mailbox.
+ """
raise Exception("_Mailbox deliver() method must be provided")
+ def destroy(self):
+ """
+ Release the mailbox. Once called, the mailbox should no longer be
+ referenced.
+ """
+ self.console._remove_mailbox(self.cid)
-class _WaitableMailbox(_Mailbox):
+
+class _SyncMailbox(_Mailbox):
"""
A simple mailbox that allows a consumer to wait for delivery of data.
"""
- def __init__(self):
- self._data = []
+ def __init__(self, console):
+ """
+ Invoked by application thread.
+ """
+ super(_SyncMailbox, self).__init__(console)
self._cv = Condition()
+ self._data = []
self._waiting = False
def deliver(self, data):
- """ Drop data into the mailbox, waking any waiters if necessary. """
+ """
+ Drop data into the mailbox, waking any waiters if necessary.
+ Invoked by Console Management thread only.
+ """
self._cv.acquire()
try:
self._data.append(data)
@@ -81,7 +107,10 @@ class _WaitableMailbox(_Mailbox):
self._cv.release()
def fetch(self, timeout=None):
- """ Get one data item from a mailbox, with timeout. """
+ """
+ Get one data item from a mailbox, with timeout.
+ Invoked by application thread.
+ """
self._cv.acquire()
try:
if len(self._data) == 0:
@@ -93,6 +122,189 @@ class _WaitableMailbox(_Mailbox):
self._cv.release()
+class _AsyncMailbox(_Mailbox):
+ """
+ A Mailbox for asynchronous delivery, with a timeout value.
+ """
+ def __init__(self, console,
+ agent_name,
+ _timeout=None):
+ """
+ Invoked by application thread.
+ """
+ super(_AsyncMailbox, self).__init__(console)
+
+ self.agent_name = agent_name
+ self.console = console
+
+ if _timeout is None:
+ _timeout = console._reply_timeout
+ self.expiration_date = (datetime.datetime.utcnow() +
+ datetime.timedelta(seconds=_timeout))
+ console._lock.acquire()
+ try:
+ console._async_mboxes[self.cid] = self
+ finally:
+ console._lock.release()
+
+ # now that an async mbox has been created, wake the
+ # console mgmt thread so it will know about the mbox expiration
+ # date (and adjust its idle sleep period correctly)
+
+ console._wake_thread()
+
+ def deliver(self, msg):
+ """
+ """
+ raise Exception("deliver() method must be provided")
+
+ def expire(self):
+ raise Exception("expire() method must be provided")
+
+
+ def destroy(self):
+ self.console._lock.acquire()
+ try:
+ if self.cid in self.console._async_mboxes:
+ del self.console._async_mboxes[self.cid]
+ finally:
+ self.console._lock.release()
+ super(_AsyncMailbox, self).destroy()
+
+
+
+class _QueryMailbox(_AsyncMailbox):
+ """
+ A mailbox used for asynchronous query requests.
+ """
+ def __init__(self, console,
+ agent_name,
+ context,
+ target, msgkey,
+ _timeout=None):
+ """
+ Invoked by application thread.
+ """
+ super(_QueryMailbox, self).__init__(console,
+ agent_name,
+ _timeout)
+ self.target = target
+ self.msgkey = msgkey
+ self.context = context
+ self.result = []
+
+ def deliver(self, reply):
+ """
+ Process query response messages delivered to this mailbox.
+ Invoked by Console Management thread only.
+ """
+ done = False
+ objects = reply.content.get(self.msgkey)
+ if not objects:
+ done = True
+ else:
+ # convert from map to native types if needed
+ if self.target == QmfQuery.TARGET_SCHEMA_ID:
+ for sid_map in objects:
+ self.result.append(SchemaClassId.from_map(sid_map))
+
+ elif self.target == QmfQuery.TARGET_SCHEMA:
+ for schema_map in objects:
+ # extract schema id, convert based on schema type
+ sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
+ if sid_map:
+ sid = SchemaClassId.from_map(sid_map)
+ if sid:
+ if sid.get_type() == SchemaClassId.TYPE_DATA:
+ schema = SchemaObjectClass.from_map(schema_map)
+ else:
+ schema = SchemaEventClass.from_map(schema_map)
+ self.console._add_schema(schema) # add to schema cache
+ self.result.append(schema)
+
+ elif self.target == QmfQuery.TARGET_OBJECT:
+ for obj_map in objects:
+ # @todo: need the agent name - ideally from the
+ # reply message iself.
+ agent = self.console.get_agent(self.agent_name)
+ if agent:
+ obj = QmfConsoleData(map_=obj_map, agent=agent)
+ # start fetch of schema if not known
+ sid = obj.get_schema_class_id()
+ if sid:
+ self.console._prefetch_schema(sid, agent)
+ self.result.append(obj)
+
+
+ else:
+ # no conversion needed.
+ self.result += objects
+
+ if done:
+ # create workitem
+ # logging.error("QUERY COMPLETE for %s" % str(self.context))
+ wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
+
+ self.destroy()
+
+
+ def expire(self):
+ logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" %
+ datetime.datetime.utcnow())
+ # send along whatever (possibly none) has been received so far
+ wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
+
+ self.destroy()
+
+
+
+class _SchemaPrefetchMailbox(_AsyncMailbox):
+ """
+ Handles responses to schema fetches made by the console.
+ """
+ def __init__(self, console,
+ agent_name,
+ schema_id,
+ _timeout=None):
+ """
+ Invoked by application thread.
+ """
+ super(_SchemaPrefetchMailbox, self).__init__(console,
+ agent_name,
+ _timeout)
+
+ self.schema_id = schema_id
+
+
+ def deliver(self, reply):
+ """
+ Process schema response messages.
+ """
+ done = False
+ schemas = reply.content.get(MsgKey.schema)
+ if schemas:
+ for schema_map in schemas:
+ # extract schema id, convert based on schema type
+ sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
+ if sid_map:
+ sid = SchemaClassId.from_map(sid_map)
+ if sid:
+ if sid.get_type() == SchemaClassId.TYPE_DATA:
+ schema = SchemaObjectClass.from_map(schema_map)
+ else:
+ schema = SchemaEventClass.from_map(schema_map)
+ self.console._add_schema(schema) # add to schema cache
+ self.destroy()
+
+
+ def expire(self):
+ self.destroy()
+
+
##==============================================================================
## DATA MODEL
@@ -185,8 +397,8 @@ class QmfConsoleData(QmfData):
if _timeout is None:
_timeout = self._agent._console._reply_timeout
- mbox = _WaitableMailbox()
- cid = self._agent._console._add_mailbox(mbox)
+ mbox = _SyncMailbox(self._agent._console)
+ cid = mbox.get_address()
_map = {self.KEY_OBJECT_ID:str(oid),
SchemaMethod.KEY_NAME:name}
@@ -202,7 +414,7 @@ class QmfConsoleData(QmfData):
self._agent._send_method_req(_map, cid)
except SendError, e:
logging.error(str(e))
- self._agent._console._remove_mailbox(cid)
+ mbox.destroy()
return None
# @todo async method calls!!!
@@ -211,7 +423,7 @@ class QmfConsoleData(QmfData):
logging.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
- self._agent._console._remove_mailbox(cid)
+ mbox.destroy()
if not replyMsg:
logging.debug("Agent method req wait timed-out.")
@@ -258,7 +470,7 @@ class Agent(object):
"""
def __init__(self, name, console):
"""
- @type name: AgentId
+ @type name: string
@param name: uniquely identifies this agent in the AMQP domain.
"""
@@ -358,8 +570,8 @@ class Agent(object):
if _in_args:
_in_args = _in_args.copy()
- mbox = _WaitableMailbox()
- cid = self._console._add_mailbox(mbox)
+ mbox = _SyncMailbox(self._console)
+ cid = mbox.get_address()
_map = {SchemaMethod.KEY_NAME:name}
if _in_args:
@@ -370,7 +582,7 @@ class Agent(object):
self._send_method_req(_map, cid)
except SendError, e:
logging.error(str(e))
- self._console._remove_mailbox(cid)
+ mbox.destroy()
return None
# @todo async method calls!!!
@@ -379,7 +591,7 @@ class Agent(object):
logging.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
- self._console._remove_mailbox(cid)
+ mbox.destroy()
if not replyMsg:
logging.debug("Agent method req wait timed-out.")
@@ -497,19 +709,20 @@ class Console(Thread):
self._announce_recvr = None
self._locate_sender = None
self._schema_cache = {}
+ self._pending_schema_req = []
self._agent_discovery_filter = None
self._reply_timeout = reply_timeout
self._agent_timeout = agent_timeout
self._next_agent_expire = None
- # lock out run() thread
- self._cv = Condition()
+ self._next_mbox_expire = None
# for passing WorkItems to the application
self._work_q = Queue.Queue()
self._work_q_put = False
# Correlation ID and mailbox storage
self._correlation_id = long(time.time()) # pseudo-randomize
self._post_office = {} # indexed by cid
-
+ self._async_mboxes = {} # indexed by cid, used to expire them
+
## Old stuff below???
#self._broker_list = []
#self.impl = qmfengine.Console()
@@ -612,15 +825,7 @@ class Console(Thread):
self._operational = False
if self.isAlive():
# kick my thread to wake it up
- logging.debug("Sending noop to wake up [%s]" % self._address)
- try:
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.noop)},
- subject=self._name,
- content={"noop":"noop"})
- self._direct_sender.send( msg, sync=True )
- except SendError, e:
- logging.error(str(e))
+ self._wake_thread()
logging.debug("waiting for console receiver thread to exit")
self.join(timeout)
if self.isAlive():
@@ -651,8 +856,8 @@ class Console(Thread):
self._lock.acquire()
try:
- if agent._id in self._agent_map:
- del self._agent_map[agent._id]
+ if agent._name in self._agent_map:
+ del self._agent_map[agent._name]
finally:
self._lock.release()
@@ -672,8 +877,8 @@ class Console(Thread):
# agent not present yet - ping it with an agent_locate
- mbox = _WaitableMailbox()
- cid = self._add_mailbox(mbox)
+ mbox = _SyncMailbox(self)
+ cid = mbox.get_address()
query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
msg = Message(subject="console.ind.locate." + name,
@@ -690,7 +895,7 @@ class Console(Thread):
self._topic_sender.send(msg)
except SendError, e:
logging.error(str(e))
- self._remove_mailbox(cid)
+ mbox.destroy()
return None
if timeout is None:
@@ -699,7 +904,7 @@ class Console(Thread):
new_agent = None
logging.debug("Waiting for response to Agent Locate (%s)" % timeout)
mbox.fetch(timeout)
- self._remove_mailbox(cid)
+ mbox.destroy()
logging.debug("Agent Locate wait ended (%s)" % time.time())
self._lock.acquire()
try:
@@ -749,15 +954,15 @@ class Console(Thread):
if not msgkey:
raise Exception("Invalid target for query: %s" % str(query))
- mbox = _WaitableMailbox()
- cid = self._add_mailbox(mbox)
+ mbox = _SyncMailbox(self)
+ cid = mbox.get_address()
try:
logging.debug("Sending Query to Agent (%s)" % time.time())
agent._send_query(query, cid)
except SendError, e:
logging.error(str(e))
- self._remove_mailbox(cid)
+ mbox.destroy()
return None
if not timeout:
@@ -802,33 +1007,74 @@ class Console(Thread):
elif target == QmfQuery.TARGET_OBJECT:
for obj_map in objects:
obj = QmfConsoleData(map_=obj_map, agent=agent)
+ # start fetch of schema if not known
+ sid = obj.get_schema_class_id()
+ if sid:
+ self._prefetch_schema(sid, agent)
response.append(obj)
- # @todo prefetch unknown schema
- # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
- # if sid_map:
- # sid = SchemaClassId.from_map(sid_map)
- # # if the object references a schema, fetch it
- # # schema = self._fetch_schema(sid, _agent=agent,
- # # _timeout=timeout)
- # # if not schema:
- # # logging.warning("Unknown schema, id=%s" % sid)
- # # continue
- # obj = QmfConsoleData(map_=obj_map, agent=agent,
- # _schema=schema)
- # else:
- # # no schema needed
else:
# no conversion needed.
response += objects
now = datetime.datetime.utcnow()
- self._remove_mailbox(cid)
+ mbox.destroy()
return response
+ def do_async_query(self, agent, query, app_handle, _timeout=None ):
+ """
+ """
+ query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
+ QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
+ QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
+ QmfQuery.TARGET_SCHEMA: MsgKey.schema,
+ QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
+ QmfQuery.TARGET_AGENT: MsgKey.agent_info}
+
+ target = query.get_target()
+ msgkey = query_keymap.get(target)
+ if not msgkey:
+ raise Exception("Invalid target for query: %s" % str(query))
+
+ mbox = _QueryMailbox(self,
+ agent.get_name(),
+ app_handle,
+ target, msgkey,
+ _timeout)
+ cid = mbox.get_address()
+
+ try:
+ logging.debug("Sending Query to Agent (%s)" % time.time())
+ agent._send_query(query, cid)
+ except SendError, e:
+ logging.error(str(e))
+ mbox.destroy()
+ return False
+ return True
+
+
+ def _wake_thread(self):
+ """
+ Make the console management thread loop wakeup from its next_receiver
+ sleep.
+ """
+ logging.debug("Sending noop to wake up [%s]" % self._address)
+ msg = Message(properties={"method":"request",
+ "qmf.subject":make_subject(OpCode.noop)},
+ subject=self._name,
+ content={"noop":"noop"})
+ try:
+ self._direct_sender.send( msg, sync=True )
+ except SendError, e:
+ logging.error(str(e))
+
def run(self):
+ """
+ Console Management Thread main loop.
+ Handles inbound messages, agent discovery, async mailbox timeouts.
+ """
global _callback_thread
self._ready.set()
@@ -858,6 +1104,7 @@ class Console(Thread):
self._dispatch(msg, _direct=True)
self._expire_agents() # check for expired agents
+ self._expire_mboxes() # check for expired async mailbox requests
#if qLen == 0 and self._work_q.qsize() and self._notifier:
if self._work_q_put and self._notifier:
@@ -869,11 +1116,15 @@ class Console(Thread):
_callback_thread = None
if self._operational:
- # wait for a message to arrive or an agent
- # to expire
+ # wait for a message to arrive, or an agent
+ # to expire, or a mailbox requrest to time out
now = datetime.datetime.utcnow()
- if self._next_agent_expire > now:
- timeout = timedelta_to_secs(self._next_agent_expire - now)
+ next_expire = self._next_agent_expire
+ if (self._next_mbox_expire and
+ self._next_mbox_expire < next_expire):
+ next_expire = self._next_mbox_expire
+ if next_expire > now:
+ timeout = timedelta_to_secs(next_expire - now)
try:
logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
xxx = self._session.next_receiver(timeout = timeout)
@@ -1089,7 +1340,8 @@ class Console(Thread):
return
# wake up all waiters
- logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ logging.debug("waking waiters for correlation id %s" %
+ msg.correlation_id)
mbox.deliver(msg)
@@ -1151,6 +1403,36 @@ class Console(Thread):
self._work_q_put = True
+ def _expire_mboxes(self):
+ """
+ Check all async mailboxes for outstanding requests that have expired.
+ """
+ now = datetime.datetime.utcnow()
+ if self._next_mbox_expire and now < self._next_mbox_expire:
+ return
+ expired_mboxes = []
+ self._next_mbox_expire = None
+ self._lock.acquire()
+ try:
+ for mbox in self._async_mboxes.itervalues():
+ if now >= mbox.expiration_date:
+ expired_mboxes.append(mbox)
+ else:
+ if (self._next_mbox_expire is None or
+ mbox.expiration_date < self._next_mbox_expire):
+ self._next_mbox_expire = mbox.expiration_date
+
+ for mbox in expired_mboxes:
+ del self._async_mboxes[mbox.cid]
+ finally:
+ self._lock.release()
+
+ for mbox in expired_mboxes:
+ # note: expire() may deallocate the mbox, so don't touch
+ # it further.
+ mbox.expire()
+
+
def _expire_agents(self):
"""
Check for expired agents and issue notifications when they expire.
@@ -1288,9 +1570,43 @@ class Console(Thread):
sid = schema.get_class_id()
if not self._schema_cache.has_key(sid):
self._schema_cache[sid] = schema
+ if sid in self._pending_schema_req:
+ self._pending_schema_req.remove(sid)
+ finally:
+ self._lock.release()
+
+ def _prefetch_schema(self, schema_id, agent):
+ """
+ Send an async request for the schema identified by schema_id if the
+ schema is not available in the cache.
+ """
+ need_fetch = False
+ self._lock.acquire()
+ try:
+ if ((not self._schema_cache.has_key(schema_id)) and
+ schema_id not in self._pending_schema_req):
+ self._pending_schema_req.append(schema_id)
+ need_fetch = True
finally:
self._lock.release()
+ if need_fetch:
+ mbox = _SchemaPrefetchMailbox(self, agent.get_name(),
+ schema_id)
+ query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id)
+ logging.debug("Sending Schema Query to Agent (%s)" % time.time())
+ try:
+ agent._send_query(query, mbox.get_address())
+ except SendError, e:
+ logging.error(str(e))
+ mbox.destroy()
+ self._lock.acquire()
+ try:
+ self._pending_schema_req.remove(schema_id)
+ finally:
+ self._lock.release()
+
+
def _fetch_schema(self, schema_id, _agent=None, _timeout=None):
"""
Find the schema identified by schema_id. If not in the cache, ask the
@@ -1320,16 +1636,16 @@ class Console(Thread):
return None
def _add_mailbox(self, mbox):
- """ Add a mailbox to the post office, return a unique identifier """
- cid = 0
+ """
+ Add a mailbox to the post office, and assign it a unique address.
+ """
self._lock.acquire()
try:
- cid = self._correlation_id
+ mbox.cid = self._correlation_id
self._correlation_id += 1
- self._post_office[cid] = mbox
+ self._post_office[mbox.cid] = mbox
finally:
self._lock.release()
- return cid
def _get_mailbox(self, mid):
try:
@@ -1355,7 +1671,8 @@ class Console(Thread):
self._lock.acquire()
try:
- del self._post_office[mid]
+ if mid in self._post_office:
+ del self._post_office[mid]
finally:
self._lock.release()