diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-12 20:15:21 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-12 20:15:21 +0000 |
commit | 1827a69f01ea0a955161fd93edfa137d7b1723a4 (patch) | |
tree | eda86ee8c8fb763c85038763c68d6892a14a1133 /python/qmf2/console.py | |
parent | cccafbd87d6a4cf00b3511402075f7a597e3df9e (diff) | |
download | qpid-python-1827a69f01ea0a955161fd93edfa137d7b1723a4.tar.gz |
QPID-2261: add async query and schema prefetch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@909591 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r-- | python/qmf2/console.py | 439 |
1 files changed, 378 insertions, 61 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py index 441b770d53..7b4bfe9fb8 100644 --- a/python/qmf2/console.py +++ b/python/qmf2/console.py @@ -56,21 +56,47 @@ class _Mailbox(object): """ Virtual base class for all Mailbox-like objects. """ + def __init__(self, console): + self.console = console + self.cid = 0 + self.console._add_mailbox(self) + + def get_address(self): + return self.cid + def deliver(self, data): + """ + Invoked by Console Management thread when a message arrives for + this mailbox. + """ raise Exception("_Mailbox deliver() method must be provided") + def destroy(self): + """ + Release the mailbox. Once called, the mailbox should no longer be + referenced. + """ + self.console._remove_mailbox(self.cid) -class _WaitableMailbox(_Mailbox): + +class _SyncMailbox(_Mailbox): """ A simple mailbox that allows a consumer to wait for delivery of data. """ - def __init__(self): - self._data = [] + def __init__(self, console): + """ + Invoked by application thread. + """ + super(_SyncMailbox, self).__init__(console) self._cv = Condition() + self._data = [] self._waiting = False def deliver(self, data): - """ Drop data into the mailbox, waking any waiters if necessary. """ + """ + Drop data into the mailbox, waking any waiters if necessary. + Invoked by Console Management thread only. + """ self._cv.acquire() try: self._data.append(data) @@ -81,7 +107,10 @@ class _WaitableMailbox(_Mailbox): self._cv.release() def fetch(self, timeout=None): - """ Get one data item from a mailbox, with timeout. """ + """ + Get one data item from a mailbox, with timeout. + Invoked by application thread. + """ self._cv.acquire() try: if len(self._data) == 0: @@ -93,6 +122,189 @@ class _WaitableMailbox(_Mailbox): self._cv.release() +class _AsyncMailbox(_Mailbox): + """ + A Mailbox for asynchronous delivery, with a timeout value. + """ + def __init__(self, console, + agent_name, + _timeout=None): + """ + Invoked by application thread. + """ + super(_AsyncMailbox, self).__init__(console) + + self.agent_name = agent_name + self.console = console + + if _timeout is None: + _timeout = console._reply_timeout + self.expiration_date = (datetime.datetime.utcnow() + + datetime.timedelta(seconds=_timeout)) + console._lock.acquire() + try: + console._async_mboxes[self.cid] = self + finally: + console._lock.release() + + # now that an async mbox has been created, wake the + # console mgmt thread so it will know about the mbox expiration + # date (and adjust its idle sleep period correctly) + + console._wake_thread() + + def deliver(self, msg): + """ + """ + raise Exception("deliver() method must be provided") + + def expire(self): + raise Exception("expire() method must be provided") + + + def destroy(self): + self.console._lock.acquire() + try: + if self.cid in self.console._async_mboxes: + del self.console._async_mboxes[self.cid] + finally: + self.console._lock.release() + super(_AsyncMailbox, self).destroy() + + + +class _QueryMailbox(_AsyncMailbox): + """ + A mailbox used for asynchronous query requests. + """ + def __init__(self, console, + agent_name, + context, + target, msgkey, + _timeout=None): + """ + Invoked by application thread. + """ + super(_QueryMailbox, self).__init__(console, + agent_name, + _timeout) + self.target = target + self.msgkey = msgkey + self.context = context + self.result = [] + + def deliver(self, reply): + """ + Process query response messages delivered to this mailbox. + Invoked by Console Management thread only. + """ + done = False + objects = reply.content.get(self.msgkey) + if not objects: + done = True + else: + # convert from map to native types if needed + if self.target == QmfQuery.TARGET_SCHEMA_ID: + for sid_map in objects: + self.result.append(SchemaClassId.from_map(sid_map)) + + elif self.target == QmfQuery.TARGET_SCHEMA: + for schema_map in objects: + # extract schema id, convert based on schema type + sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + if sid: + if sid.get_type() == SchemaClassId.TYPE_DATA: + schema = SchemaObjectClass.from_map(schema_map) + else: + schema = SchemaEventClass.from_map(schema_map) + self.console._add_schema(schema) # add to schema cache + self.result.append(schema) + + elif self.target == QmfQuery.TARGET_OBJECT: + for obj_map in objects: + # @todo: need the agent name - ideally from the + # reply message iself. + agent = self.console.get_agent(self.agent_name) + if agent: + obj = QmfConsoleData(map_=obj_map, agent=agent) + # start fetch of schema if not known + sid = obj.get_schema_class_id() + if sid: + self.console._prefetch_schema(sid, agent) + self.result.append(obj) + + + else: + # no conversion needed. + self.result += objects + + if done: + # create workitem + # logging.error("QUERY COMPLETE for %s" % str(self.context)) + wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) + self.console._work_q.put(wi) + self.console._work_q_put = True + + self.destroy() + + + def expire(self): + logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" % + datetime.datetime.utcnow()) + # send along whatever (possibly none) has been received so far + wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) + self.console._work_q.put(wi) + self.console._work_q_put = True + + self.destroy() + + + +class _SchemaPrefetchMailbox(_AsyncMailbox): + """ + Handles responses to schema fetches made by the console. + """ + def __init__(self, console, + agent_name, + schema_id, + _timeout=None): + """ + Invoked by application thread. + """ + super(_SchemaPrefetchMailbox, self).__init__(console, + agent_name, + _timeout) + + self.schema_id = schema_id + + + def deliver(self, reply): + """ + Process schema response messages. + """ + done = False + schemas = reply.content.get(MsgKey.schema) + if schemas: + for schema_map in schemas: + # extract schema id, convert based on schema type + sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + if sid: + if sid.get_type() == SchemaClassId.TYPE_DATA: + schema = SchemaObjectClass.from_map(schema_map) + else: + schema = SchemaEventClass.from_map(schema_map) + self.console._add_schema(schema) # add to schema cache + self.destroy() + + + def expire(self): + self.destroy() + + ##============================================================================== ## DATA MODEL @@ -185,8 +397,8 @@ class QmfConsoleData(QmfData): if _timeout is None: _timeout = self._agent._console._reply_timeout - mbox = _WaitableMailbox() - cid = self._agent._console._add_mailbox(mbox) + mbox = _SyncMailbox(self._agent._console) + cid = mbox.get_address() _map = {self.KEY_OBJECT_ID:str(oid), SchemaMethod.KEY_NAME:name} @@ -202,7 +414,7 @@ class QmfConsoleData(QmfData): self._agent._send_method_req(_map, cid) except SendError, e: logging.error(str(e)) - self._agent._console._remove_mailbox(cid) + mbox.destroy() return None # @todo async method calls!!! @@ -211,7 +423,7 @@ class QmfConsoleData(QmfData): logging.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) - self._agent._console._remove_mailbox(cid) + mbox.destroy() if not replyMsg: logging.debug("Agent method req wait timed-out.") @@ -258,7 +470,7 @@ class Agent(object): """ def __init__(self, name, console): """ - @type name: AgentId + @type name: string @param name: uniquely identifies this agent in the AMQP domain. """ @@ -358,8 +570,8 @@ class Agent(object): if _in_args: _in_args = _in_args.copy() - mbox = _WaitableMailbox() - cid = self._console._add_mailbox(mbox) + mbox = _SyncMailbox(self._console) + cid = mbox.get_address() _map = {SchemaMethod.KEY_NAME:name} if _in_args: @@ -370,7 +582,7 @@ class Agent(object): self._send_method_req(_map, cid) except SendError, e: logging.error(str(e)) - self._console._remove_mailbox(cid) + mbox.destroy() return None # @todo async method calls!!! @@ -379,7 +591,7 @@ class Agent(object): logging.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) - self._console._remove_mailbox(cid) + mbox.destroy() if not replyMsg: logging.debug("Agent method req wait timed-out.") @@ -497,19 +709,20 @@ class Console(Thread): self._announce_recvr = None self._locate_sender = None self._schema_cache = {} + self._pending_schema_req = [] self._agent_discovery_filter = None self._reply_timeout = reply_timeout self._agent_timeout = agent_timeout self._next_agent_expire = None - # lock out run() thread - self._cv = Condition() + self._next_mbox_expire = None # for passing WorkItems to the application self._work_q = Queue.Queue() self._work_q_put = False # Correlation ID and mailbox storage self._correlation_id = long(time.time()) # pseudo-randomize self._post_office = {} # indexed by cid - + self._async_mboxes = {} # indexed by cid, used to expire them + ## Old stuff below??? #self._broker_list = [] #self.impl = qmfengine.Console() @@ -612,15 +825,7 @@ class Console(Thread): self._operational = False if self.isAlive(): # kick my thread to wake it up - logging.debug("Sending noop to wake up [%s]" % self._address) - try: - msg = Message(properties={"method":"request", - "qmf.subject":make_subject(OpCode.noop)}, - subject=self._name, - content={"noop":"noop"}) - self._direct_sender.send( msg, sync=True ) - except SendError, e: - logging.error(str(e)) + self._wake_thread() logging.debug("waiting for console receiver thread to exit") self.join(timeout) if self.isAlive(): @@ -651,8 +856,8 @@ class Console(Thread): self._lock.acquire() try: - if agent._id in self._agent_map: - del self._agent_map[agent._id] + if agent._name in self._agent_map: + del self._agent_map[agent._name] finally: self._lock.release() @@ -672,8 +877,8 @@ class Console(Thread): # agent not present yet - ping it with an agent_locate - mbox = _WaitableMailbox() - cid = self._add_mailbox(mbox) + mbox = _SyncMailbox(self) + cid = mbox.get_address() query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) msg = Message(subject="console.ind.locate." + name, @@ -690,7 +895,7 @@ class Console(Thread): self._topic_sender.send(msg) except SendError, e: logging.error(str(e)) - self._remove_mailbox(cid) + mbox.destroy() return None if timeout is None: @@ -699,7 +904,7 @@ class Console(Thread): new_agent = None logging.debug("Waiting for response to Agent Locate (%s)" % timeout) mbox.fetch(timeout) - self._remove_mailbox(cid) + mbox.destroy() logging.debug("Agent Locate wait ended (%s)" % time.time()) self._lock.acquire() try: @@ -749,15 +954,15 @@ class Console(Thread): if not msgkey: raise Exception("Invalid target for query: %s" % str(query)) - mbox = _WaitableMailbox() - cid = self._add_mailbox(mbox) + mbox = _SyncMailbox(self) + cid = mbox.get_address() try: logging.debug("Sending Query to Agent (%s)" % time.time()) agent._send_query(query, cid) except SendError, e: logging.error(str(e)) - self._remove_mailbox(cid) + mbox.destroy() return None if not timeout: @@ -802,33 +1007,74 @@ class Console(Thread): elif target == QmfQuery.TARGET_OBJECT: for obj_map in objects: obj = QmfConsoleData(map_=obj_map, agent=agent) + # start fetch of schema if not known + sid = obj.get_schema_class_id() + if sid: + self._prefetch_schema(sid, agent) response.append(obj) - # @todo prefetch unknown schema - # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) - # if sid_map: - # sid = SchemaClassId.from_map(sid_map) - # # if the object references a schema, fetch it - # # schema = self._fetch_schema(sid, _agent=agent, - # # _timeout=timeout) - # # if not schema: - # # logging.warning("Unknown schema, id=%s" % sid) - # # continue - # obj = QmfConsoleData(map_=obj_map, agent=agent, - # _schema=schema) - # else: - # # no schema needed else: # no conversion needed. response += objects now = datetime.datetime.utcnow() - self._remove_mailbox(cid) + mbox.destroy() return response + def do_async_query(self, agent, query, app_handle, _timeout=None ): + """ + """ + query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info, + QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id, + QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id, + QmfQuery.TARGET_SCHEMA: MsgKey.schema, + QmfQuery.TARGET_OBJECT: MsgKey.data_obj, + QmfQuery.TARGET_AGENT: MsgKey.agent_info} + + target = query.get_target() + msgkey = query_keymap.get(target) + if not msgkey: + raise Exception("Invalid target for query: %s" % str(query)) + + mbox = _QueryMailbox(self, + agent.get_name(), + app_handle, + target, msgkey, + _timeout) + cid = mbox.get_address() + + try: + logging.debug("Sending Query to Agent (%s)" % time.time()) + agent._send_query(query, cid) + except SendError, e: + logging.error(str(e)) + mbox.destroy() + return False + return True + + + def _wake_thread(self): + """ + Make the console management thread loop wakeup from its next_receiver + sleep. + """ + logging.debug("Sending noop to wake up [%s]" % self._address) + msg = Message(properties={"method":"request", + "qmf.subject":make_subject(OpCode.noop)}, + subject=self._name, + content={"noop":"noop"}) + try: + self._direct_sender.send( msg, sync=True ) + except SendError, e: + logging.error(str(e)) + def run(self): + """ + Console Management Thread main loop. + Handles inbound messages, agent discovery, async mailbox timeouts. + """ global _callback_thread self._ready.set() @@ -858,6 +1104,7 @@ class Console(Thread): self._dispatch(msg, _direct=True) self._expire_agents() # check for expired agents + self._expire_mboxes() # check for expired async mailbox requests #if qLen == 0 and self._work_q.qsize() and self._notifier: if self._work_q_put and self._notifier: @@ -869,11 +1116,15 @@ class Console(Thread): _callback_thread = None if self._operational: - # wait for a message to arrive or an agent - # to expire + # wait for a message to arrive, or an agent + # to expire, or a mailbox requrest to time out now = datetime.datetime.utcnow() - if self._next_agent_expire > now: - timeout = timedelta_to_secs(self._next_agent_expire - now) + next_expire = self._next_agent_expire + if (self._next_mbox_expire and + self._next_mbox_expire < next_expire): + next_expire = self._next_mbox_expire + if next_expire > now: + timeout = timedelta_to_secs(next_expire - now) try: logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) xxx = self._session.next_receiver(timeout = timeout) @@ -1089,7 +1340,8 @@ class Console(Thread): return # wake up all waiters - logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + logging.debug("waking waiters for correlation id %s" % + msg.correlation_id) mbox.deliver(msg) @@ -1151,6 +1403,36 @@ class Console(Thread): self._work_q_put = True + def _expire_mboxes(self): + """ + Check all async mailboxes for outstanding requests that have expired. + """ + now = datetime.datetime.utcnow() + if self._next_mbox_expire and now < self._next_mbox_expire: + return + expired_mboxes = [] + self._next_mbox_expire = None + self._lock.acquire() + try: + for mbox in self._async_mboxes.itervalues(): + if now >= mbox.expiration_date: + expired_mboxes.append(mbox) + else: + if (self._next_mbox_expire is None or + mbox.expiration_date < self._next_mbox_expire): + self._next_mbox_expire = mbox.expiration_date + + for mbox in expired_mboxes: + del self._async_mboxes[mbox.cid] + finally: + self._lock.release() + + for mbox in expired_mboxes: + # note: expire() may deallocate the mbox, so don't touch + # it further. + mbox.expire() + + def _expire_agents(self): """ Check for expired agents and issue notifications when they expire. @@ -1288,9 +1570,43 @@ class Console(Thread): sid = schema.get_class_id() if not self._schema_cache.has_key(sid): self._schema_cache[sid] = schema + if sid in self._pending_schema_req: + self._pending_schema_req.remove(sid) + finally: + self._lock.release() + + def _prefetch_schema(self, schema_id, agent): + """ + Send an async request for the schema identified by schema_id if the + schema is not available in the cache. + """ + need_fetch = False + self._lock.acquire() + try: + if ((not self._schema_cache.has_key(schema_id)) and + schema_id not in self._pending_schema_req): + self._pending_schema_req.append(schema_id) + need_fetch = True finally: self._lock.release() + if need_fetch: + mbox = _SchemaPrefetchMailbox(self, agent.get_name(), + schema_id) + query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id) + logging.debug("Sending Schema Query to Agent (%s)" % time.time()) + try: + agent._send_query(query, mbox.get_address()) + except SendError, e: + logging.error(str(e)) + mbox.destroy() + self._lock.acquire() + try: + self._pending_schema_req.remove(schema_id) + finally: + self._lock.release() + + def _fetch_schema(self, schema_id, _agent=None, _timeout=None): """ Find the schema identified by schema_id. If not in the cache, ask the @@ -1320,16 +1636,16 @@ class Console(Thread): return None def _add_mailbox(self, mbox): - """ Add a mailbox to the post office, return a unique identifier """ - cid = 0 + """ + Add a mailbox to the post office, and assign it a unique address. + """ self._lock.acquire() try: - cid = self._correlation_id + mbox.cid = self._correlation_id self._correlation_id += 1 - self._post_office[cid] = mbox + self._post_office[mbox.cid] = mbox finally: self._lock.release() - return cid def _get_mailbox(self, mid): try: @@ -1355,7 +1671,8 @@ class Console(Thread): self._lock.acquire() try: - del self._post_office[mid] + if mid in self._post_office: + del self._post_office[mid] finally: self._lock.release() |