diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-04 19:38:55 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-04 19:38:55 +0000 |
commit | ab53c7c55069282cb6ec1e38c17b46112c9d21f1 (patch) | |
tree | f1bd78327102a1897d2e9b1ffc781e8f40424c26 /python/qmf2/console.py | |
parent | 62b6eaea83d81155695d19dc716ad97094e89e54 (diff) | |
download | qpid-python-ab53c7c55069282cb6ec1e38c17b46112c9d21f1.tar.gz |
QPID-2261: add multi-msg query response support. Fix mailbox code to allow mult-msg per correlation id.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@906615 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r-- | python/qmf2/console.py | 384 |
1 files changed, 175 insertions, 209 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py index df0f93f1da..441b770d53 100644 --- a/python/qmf2/console.py +++ b/python/qmf2/console.py @@ -44,146 +44,56 @@ _callback_thread=None ##============================================================================== -## Sequence Manager +## Console Transaction Management +## +## At any given time, a console application may have multiple outstanding +## message transactions with agents. The following objects allow the console +## to track these outstanding transactions. ##============================================================================== + class _Mailbox(object): """ - Virtual base class for all Mailbox-like objects + Virtual base class for all Mailbox-like objects. + """ + def deliver(self, data): + raise Exception("_Mailbox deliver() method must be provided") + + +class _WaitableMailbox(_Mailbox): + """ + A simple mailbox that allows a consumer to wait for delivery of data. """ def __init__(self): - self._msgs = [] + self._data = [] self._cv = Condition() self._waiting = False - def deliver(self, obj): + def deliver(self, data): + """ Drop data into the mailbox, waking any waiters if necessary. """ self._cv.acquire() try: - self._msgs.append(obj) + self._data.append(data) # if was empty, notify waiters - if len(self._msgs) == 1: + if len(self._data) == 1: self._cv.notify() finally: self._cv.release() def fetch(self, timeout=None): + """ Get one data item from a mailbox, with timeout. """ self._cv.acquire() try: - if len(self._msgs) == 0: + if len(self._data) == 0: self._cv.wait(timeout) - if len(self._msgs): - return self._msgs.pop() + if len(self._data): + return self._data.pop(0) return None finally: self._cv.release() -class SequencedWaiter(object): - """ - Manage sequence numbers for asynchronous method calls. - Allows the caller to associate a generic piece of data with a unique sequence - number.""" - - def __init__(self): - self.lock = Lock() - self.sequence = long(time.time()) # pseudo-randomize seq start - self.pending = {} - - - def allocate(self): - """ - Reserve a sequence number. - - @rtype: long - @return: a unique nonzero sequence number. - """ - self.lock.acquire() - try: - seq = self.sequence - self.sequence = self.sequence + 1 - self.pending[seq] = _Mailbox() - finally: - self.lock.release() - logging.debug( "sequence %d allocated" % seq) - return seq - - - def put_data(self, seq, new_data): - seq = long(seq) - logging.debug( "putting data [%r] to seq %d..." % (new_data, seq) ) - self.lock.acquire() - try: - if seq in self.pending: - # logging.error("Putting seq %d @ %s" % (seq,time.time())) - self.pending[seq].deliver(new_data) - else: - logging.error( "seq %d not found!" % seq ) - finally: - self.lock.release() - - - - def get_data(self, seq, timeout=None): - """ - Release a sequence number reserved using the reserve method. This must - be called when the sequence is no longer needed. - - @type seq: int - @param seq: a sequence previously allocated by calling reserve(). - @rtype: any - @return: the data originally associated with the reserved sequence number. - """ - seq = long(seq) - logging.debug( "getting data for seq=%d" % seq) - mbox = None - self.lock.acquire() - try: - if seq in self.pending: - mbox = self.pending[seq] - finally: - self.lock.release() - - # Note well: pending list is unlocked, so we can wait. - # we reference mbox locally, so it will not be released - # until we are done. - - if mbox: - d = mbox.fetch(timeout) - logging.debug( "seq %d fetched %r!" % (seq, d) ) - return d - - logging.debug( "seq %d not found!" % seq ) - return None - - - def release(self, seq): - """ - Release the sequence, and its mailbox - """ - seq = long(seq) - logging.debug( "releasing seq %d" % seq ) - self.lock.acquire() - try: - if seq in self.pending: - del self.pending[seq] - finally: - self.lock.release() - - - def is_valid(self, seq): - """ - True if seq is in use, else False (seq is unknown) - """ - seq = long(seq) - self.lock.acquire() - try: - return seq in self.pending - finally: - self.lock.release() - return False - - ##============================================================================== ## DATA MODEL ##============================================================================== @@ -275,9 +185,8 @@ class QmfConsoleData(QmfData): if _timeout is None: _timeout = self._agent._console._reply_timeout - handle = self._agent._console._req_correlation.allocate() - if handle == 0: - raise Exception("Can not allocate a correlation id!") + mbox = _WaitableMailbox() + cid = self._agent._console._add_mailbox(mbox) _map = {self.KEY_OBJECT_ID:str(oid), SchemaMethod.KEY_NAME:name} @@ -290,10 +199,10 @@ class QmfConsoleData(QmfData): logging.debug("Sending method req to Agent (%s)" % time.time()) try: - self._agent._send_method_req(_map, handle) + self._agent._send_method_req(_map, cid) except SendError, e: logging.error(str(e)) - self._agent._console._req_correlation.release(handle) + self._agent._console._remove_mailbox(cid) return None # @todo async method calls!!! @@ -301,8 +210,9 @@ class QmfConsoleData(QmfData): print("ASYNC TBD") logging.debug("Waiting for response to method req (%s)" % _timeout) - replyMsg = self._agent._console._req_correlation.get_data(handle, _timeout) - self._agent._console._req_correlation.release(handle) + replyMsg = mbox.fetch(_timeout) + self._agent._console._remove_mailbox(cid) + if not replyMsg: logging.debug("Agent method req wait timed-out.") return None @@ -376,10 +286,6 @@ class Agent(object): Low-level routine to asynchronously send a message to this agent. """ msg.reply_to = str(self._console._address) - # handle = self._console._req_correlation.allocate() - # if handle == 0: - # raise Exception("Can not allocate a correlation id!") - # msg.correlation_id = str(handle) if correlation_id: msg.correlation_id = str(correlation_id) # TRACE @@ -452,9 +358,8 @@ class Agent(object): if _in_args: _in_args = _in_args.copy() - handle = self._console._req_correlation.allocate() - if handle == 0: - raise Exception("Can not allocate a correlation id!") + mbox = _WaitableMailbox() + cid = self._console._add_mailbox(mbox) _map = {SchemaMethod.KEY_NAME:name} if _in_args: @@ -462,10 +367,10 @@ class Agent(object): logging.debug("Sending method req to Agent (%s)" % time.time()) try: - self._send_method_req(_map, handle) + self._send_method_req(_map, cid) except SendError, e: logging.error(str(e)) - self._console._req_correlation.release(handle) + self._console._remove_mailbox(cid) return None # @todo async method calls!!! @@ -473,8 +378,9 @@ class Agent(object): print("ASYNC TBD") logging.debug("Waiting for response to method req (%s)" % _timeout) - replyMsg = self._console._req_correlation.get_data(handle, _timeout) - self._console._req_correlation.release(handle) + replyMsg = mbox.fetch(_timeout) + self._console._remove_mailbox(cid) + if not replyMsg: logging.debug("Agent method req wait timed-out.") return None @@ -591,7 +497,6 @@ class Console(Thread): self._announce_recvr = None self._locate_sender = None self._schema_cache = {} - self._req_correlation = SequencedWaiter() self._agent_discovery_filter = None self._reply_timeout = reply_timeout self._agent_timeout = agent_timeout @@ -601,6 +506,10 @@ class Console(Thread): # for passing WorkItems to the application self._work_q = Queue.Queue() self._work_q_put = False + # Correlation ID and mailbox storage + self._correlation_id = long(time.time()) # pseudo-randomize + self._post_office = {} # indexed by cid + ## Old stuff below??? #self._broker_list = [] #self.impl = qmfengine.Console() @@ -763,9 +672,8 @@ class Console(Thread): # agent not present yet - ping it with an agent_locate - handle = self._req_correlation.allocate() - if handle == 0: - raise Exception("Can not allocate a correlation id!") + mbox = _WaitableMailbox() + cid = self._add_mailbox(mbox) query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) msg = Message(subject="console.ind.locate." + name, @@ -773,7 +681,7 @@ class Console(Thread): "qmf.subject":make_subject(OpCode.agent_locate)}, content={MsgKey.query: query.map_encode()}) msg.reply_to = str(self._address) - msg.correlation_id = str(handle) + msg.correlation_id = str(cid) logging.debug("Sending Agent Locate (%s)" % time.time()) # TRACE #logging.error("!!! Console %s sending agent locate (%s)" % @@ -782,7 +690,7 @@ class Console(Thread): self._topic_sender.send(msg) except SendError, e: logging.error(str(e)) - self._req_correlation.release(handle) + self._remove_mailbox(cid) return None if timeout is None: @@ -790,14 +698,15 @@ class Console(Thread): new_agent = None logging.debug("Waiting for response to Agent Locate (%s)" % timeout) - self._req_correlation.get_data( handle, timeout ) - self._req_correlation.release(handle) + mbox.fetch(timeout) + self._remove_mailbox(cid) logging.debug("Agent Locate wait ended (%s)" % time.time()) self._lock.acquire() try: new_agent = self._agent_map.get(name) finally: self._lock.release() + return new_agent @@ -828,82 +737,96 @@ class Console(Thread): def do_query(self, agent, query, timeout=None ): """ """ + query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info, + QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id, + QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id, + QmfQuery.TARGET_SCHEMA: MsgKey.schema, + QmfQuery.TARGET_OBJECT: MsgKey.data_obj, + QmfQuery.TARGET_AGENT: MsgKey.agent_info} target = query.get_target() - handle = self._req_correlation.allocate() - if handle == 0: - raise Exception("Can not allocate a correlation id!") + msgkey = query_keymap.get(target) + if not msgkey: + raise Exception("Invalid target for query: %s" % str(query)) + + mbox = _WaitableMailbox() + cid = self._add_mailbox(mbox) + try: logging.debug("Sending Query to Agent (%s)" % time.time()) - agent._send_query(query, handle) + agent._send_query(query, cid) except SendError, e: logging.error(str(e)) - self._req_correlation.release(handle) + self._remove_mailbox(cid) return None if not timeout: timeout = self._reply_timeout logging.debug("Waiting for response to Query (%s)" % timeout) - reply = self._req_correlation.get_data(handle, timeout) - self._req_correlation.release(handle) - if not reply: - logging.debug("Agent Query wait timed-out.") - return None + now = datetime.datetime.utcnow() + expire = now + datetime.timedelta(seconds=timeout) + + response = [] + while (expire > now): + timeout = timedelta_to_secs(expire - now) + reply = mbox.fetch(timeout) + if not reply: + logging.debug("Query wait timed-out.") + break + + objects = reply.content.get(msgkey) + if not objects: + # last response is empty + break + + # convert from map to native types if needed + if target == QmfQuery.TARGET_SCHEMA_ID: + for sid_map in objects: + response.append(SchemaClassId.from_map(sid_map)) + + elif target == QmfQuery.TARGET_SCHEMA: + for schema_map in objects: + # extract schema id, convert based on schema type + sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) + if sid_map: + sid = SchemaClassId.from_map(sid_map) + if sid: + if sid.get_type() == SchemaClassId.TYPE_DATA: + schema = SchemaObjectClass.from_map(schema_map) + else: + schema = SchemaEventClass.from_map(schema_map) + self._add_schema(schema) # add to schema cache + response.append(schema) + + elif target == QmfQuery.TARGET_OBJECT: + for obj_map in objects: + obj = QmfConsoleData(map_=obj_map, agent=agent) + response.append(obj) + # @todo prefetch unknown schema + # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) + # if sid_map: + # sid = SchemaClassId.from_map(sid_map) + # # if the object references a schema, fetch it + # # schema = self._fetch_schema(sid, _agent=agent, + # # _timeout=timeout) + # # if not schema: + # # logging.warning("Unknown schema, id=%s" % sid) + # # continue + # obj = QmfConsoleData(map_=obj_map, agent=agent, + # _schema=schema) + # else: + # # no schema needed + else: + # no conversion needed. + response += objects + + now = datetime.datetime.utcnow() + + self._remove_mailbox(cid) + return response + - if target == QmfQuery.TARGET_PACKAGES: - # simply pass back the list of package names - logging.debug("Response to Packet Query received") - return reply.content.get(MsgKey.package_info) - elif target == QmfQuery.TARGET_OBJECT_ID: - # simply pass back the list of object_id's - logging.debug("Response to Object Id Query received") - return reply.content.get(MsgKey.object_id) - elif target == QmfQuery.TARGET_SCHEMA_ID: - logging.debug("Response to Schema Id Query received") - id_list = [] - for sid_map in reply.content.get(MsgKey.schema_id): - id_list.append(SchemaClassId.from_map(sid_map)) - return id_list - elif target == QmfQuery.TARGET_SCHEMA: - logging.debug("Response to Schema Query received") - schema_list = [] - for schema_map in reply.content.get(MsgKey.schema): - # extract schema id, convert based on schema type - sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) - if sid_map: - sid = SchemaClassId.from_map(sid_map) - if sid: - if sid.get_type() == SchemaClassId.TYPE_DATA: - schema = SchemaObjectClass.from_map(schema_map) - else: - schema = SchemaEventClass.from_map(schema_map) - schema_list.append(schema) - self._add_schema(schema) - return schema_list - elif target == QmfQuery.TARGET_OBJECT: - logging.debug("Response to Object Query received") - obj_list = [] - for obj_map in reply.content.get(MsgKey.data_obj): - obj = QmfConsoleData(map_=obj_map, agent=agent) - obj_list.append(obj) - # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) - # if sid_map: - # sid = SchemaClassId.from_map(sid_map) - # # if the object references a schema, fetch it - # # schema = self._fetch_schema(sid, _agent=agent, - # # _timeout=timeout) - # # if not schema: - # # logging.warning("Unknown schema, id=%s" % sid) - # # continue - # obj = QmfConsoleData(map_=obj_map, agent=agent, - # _schema=schema) - # else: - # # no schema needed - return obj_list - else: - logging.warning("Unexpected Target for a Query: '%s'" % target) - return None def run(self): global _callback_thread @@ -1115,7 +1038,8 @@ class Console(Thread): correlated = False if msg.correlation_id: - correlated = self._req_correlation.is_valid(msg.correlation_id) + mbox = self._get_mailbox(msg.correlation_id) + correlated = mbox is not None agent = None self._lock.acquire() @@ -1150,7 +1074,7 @@ class Console(Thread): if correlated: # wake up all waiters logging.debug("waking waiters for correlation id %s" % msg.correlation_id) - self._req_correlation.put_data(msg.correlation_id, msg) + mbox.deliver(msg) def _handle_data_ind_msg(self, msg, cmap, version, direct): """ @@ -1158,14 +1082,15 @@ class Console(Thread): """ logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time())) - if not self._req_correlation.is_valid(msg.correlation_id): + mbox = self._get_mailbox(msg.correlation_id) + if not mbox: logging.debug("Data indicate received with unknown correlation_id" " msg='%s'" % str(msg)) return # wake up all waiters logging.debug("waking waiters for correlation id %s" % msg.correlation_id) - self._req_correlation.put_data(msg.correlation_id, msg) + mbox.deliver(msg) def _handle_response_msg(self, msg, cmap, version, direct): @@ -1175,14 +1100,15 @@ class Console(Thread): # @todo code replication - clean me. logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time())) - if not self._req_correlation.is_valid(msg.correlation_id): + mbox = self._get_mailbox(msg.correlation_id) + if not mbox: logging.debug("Response msg received with unknown correlation_id" " msg='%s'" % str(msg)) return # wake up all waiters logging.debug("waking waiters for correlation id %s" % msg.correlation_id) - self._req_correlation.put_data(msg.correlation_id, msg) + mbox.deliver(msg) def _handle_event_ind_msg(self, msg, cmap, version, _direct): ei_map = cmap.get(MsgKey.event) @@ -1393,6 +1319,46 @@ class Console(Thread): else: return None + def _add_mailbox(self, mbox): + """ Add a mailbox to the post office, return a unique identifier """ + cid = 0 + self._lock.acquire() + try: + cid = self._correlation_id + self._correlation_id += 1 + self._post_office[cid] = mbox + finally: + self._lock.release() + return cid + + def _get_mailbox(self, mid): + try: + mid = long(mid) + except TypeError: + logging.error("Invalid mailbox id: %s" % str(mid)) + return None + + self._lock.acquire() + try: + return self._post_office.get(mid) + finally: + self._lock.release() + + + def _remove_mailbox(self, mid): + """ Remove a mailbox and its address from the post office """ + try: + mid = long(mid) + except TypeError: + logging.error("Invalid mailbox id: %s" % str(mid)) + return None + + self._lock.acquire() + try: + del self._post_office[mid] + finally: + self._lock.release() + def __repr__(self): return str(self._address) |