summaryrefslogtreecommitdiff
path: root/python/qmf2/console.py
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-02-04 19:38:55 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-02-04 19:38:55 +0000
commitab53c7c55069282cb6ec1e38c17b46112c9d21f1 (patch)
treef1bd78327102a1897d2e9b1ffc781e8f40424c26 /python/qmf2/console.py
parent62b6eaea83d81155695d19dc716ad97094e89e54 (diff)
downloadqpid-python-ab53c7c55069282cb6ec1e38c17b46112c9d21f1.tar.gz
QPID-2261: add multi-msg query response support. Fix mailbox code to allow mult-msg per correlation id.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@906615 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r--python/qmf2/console.py384
1 files changed, 175 insertions, 209 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py
index df0f93f1da..441b770d53 100644
--- a/python/qmf2/console.py
+++ b/python/qmf2/console.py
@@ -44,146 +44,56 @@ _callback_thread=None
##==============================================================================
-## Sequence Manager
+## Console Transaction Management
+##
+## At any given time, a console application may have multiple outstanding
+## message transactions with agents. The following objects allow the console
+## to track these outstanding transactions.
##==============================================================================
+
class _Mailbox(object):
"""
- Virtual base class for all Mailbox-like objects
+ Virtual base class for all Mailbox-like objects.
+ """
+ def deliver(self, data):
+ raise Exception("_Mailbox deliver() method must be provided")
+
+
+class _WaitableMailbox(_Mailbox):
+ """
+ A simple mailbox that allows a consumer to wait for delivery of data.
"""
def __init__(self):
- self._msgs = []
+ self._data = []
self._cv = Condition()
self._waiting = False
- def deliver(self, obj):
+ def deliver(self, data):
+ """ Drop data into the mailbox, waking any waiters if necessary. """
self._cv.acquire()
try:
- self._msgs.append(obj)
+ self._data.append(data)
# if was empty, notify waiters
- if len(self._msgs) == 1:
+ if len(self._data) == 1:
self._cv.notify()
finally:
self._cv.release()
def fetch(self, timeout=None):
+ """ Get one data item from a mailbox, with timeout. """
self._cv.acquire()
try:
- if len(self._msgs) == 0:
+ if len(self._data) == 0:
self._cv.wait(timeout)
- if len(self._msgs):
- return self._msgs.pop()
+ if len(self._data):
+ return self._data.pop(0)
return None
finally:
self._cv.release()
-class SequencedWaiter(object):
- """
- Manage sequence numbers for asynchronous method calls.
- Allows the caller to associate a generic piece of data with a unique sequence
- number."""
-
- def __init__(self):
- self.lock = Lock()
- self.sequence = long(time.time()) # pseudo-randomize seq start
- self.pending = {}
-
-
- def allocate(self):
- """
- Reserve a sequence number.
-
- @rtype: long
- @return: a unique nonzero sequence number.
- """
- self.lock.acquire()
- try:
- seq = self.sequence
- self.sequence = self.sequence + 1
- self.pending[seq] = _Mailbox()
- finally:
- self.lock.release()
- logging.debug( "sequence %d allocated" % seq)
- return seq
-
-
- def put_data(self, seq, new_data):
- seq = long(seq)
- logging.debug( "putting data [%r] to seq %d..." % (new_data, seq) )
- self.lock.acquire()
- try:
- if seq in self.pending:
- # logging.error("Putting seq %d @ %s" % (seq,time.time()))
- self.pending[seq].deliver(new_data)
- else:
- logging.error( "seq %d not found!" % seq )
- finally:
- self.lock.release()
-
-
-
- def get_data(self, seq, timeout=None):
- """
- Release a sequence number reserved using the reserve method. This must
- be called when the sequence is no longer needed.
-
- @type seq: int
- @param seq: a sequence previously allocated by calling reserve().
- @rtype: any
- @return: the data originally associated with the reserved sequence number.
- """
- seq = long(seq)
- logging.debug( "getting data for seq=%d" % seq)
- mbox = None
- self.lock.acquire()
- try:
- if seq in self.pending:
- mbox = self.pending[seq]
- finally:
- self.lock.release()
-
- # Note well: pending list is unlocked, so we can wait.
- # we reference mbox locally, so it will not be released
- # until we are done.
-
- if mbox:
- d = mbox.fetch(timeout)
- logging.debug( "seq %d fetched %r!" % (seq, d) )
- return d
-
- logging.debug( "seq %d not found!" % seq )
- return None
-
-
- def release(self, seq):
- """
- Release the sequence, and its mailbox
- """
- seq = long(seq)
- logging.debug( "releasing seq %d" % seq )
- self.lock.acquire()
- try:
- if seq in self.pending:
- del self.pending[seq]
- finally:
- self.lock.release()
-
-
- def is_valid(self, seq):
- """
- True if seq is in use, else False (seq is unknown)
- """
- seq = long(seq)
- self.lock.acquire()
- try:
- return seq in self.pending
- finally:
- self.lock.release()
- return False
-
-
##==============================================================================
## DATA MODEL
##==============================================================================
@@ -275,9 +185,8 @@ class QmfConsoleData(QmfData):
if _timeout is None:
_timeout = self._agent._console._reply_timeout
- handle = self._agent._console._req_correlation.allocate()
- if handle == 0:
- raise Exception("Can not allocate a correlation id!")
+ mbox = _WaitableMailbox()
+ cid = self._agent._console._add_mailbox(mbox)
_map = {self.KEY_OBJECT_ID:str(oid),
SchemaMethod.KEY_NAME:name}
@@ -290,10 +199,10 @@ class QmfConsoleData(QmfData):
logging.debug("Sending method req to Agent (%s)" % time.time())
try:
- self._agent._send_method_req(_map, handle)
+ self._agent._send_method_req(_map, cid)
except SendError, e:
logging.error(str(e))
- self._agent._console._req_correlation.release(handle)
+ self._agent._console._remove_mailbox(cid)
return None
# @todo async method calls!!!
@@ -301,8 +210,9 @@ class QmfConsoleData(QmfData):
print("ASYNC TBD")
logging.debug("Waiting for response to method req (%s)" % _timeout)
- replyMsg = self._agent._console._req_correlation.get_data(handle, _timeout)
- self._agent._console._req_correlation.release(handle)
+ replyMsg = mbox.fetch(_timeout)
+ self._agent._console._remove_mailbox(cid)
+
if not replyMsg:
logging.debug("Agent method req wait timed-out.")
return None
@@ -376,10 +286,6 @@ class Agent(object):
Low-level routine to asynchronously send a message to this agent.
"""
msg.reply_to = str(self._console._address)
- # handle = self._console._req_correlation.allocate()
- # if handle == 0:
- # raise Exception("Can not allocate a correlation id!")
- # msg.correlation_id = str(handle)
if correlation_id:
msg.correlation_id = str(correlation_id)
# TRACE
@@ -452,9 +358,8 @@ class Agent(object):
if _in_args:
_in_args = _in_args.copy()
- handle = self._console._req_correlation.allocate()
- if handle == 0:
- raise Exception("Can not allocate a correlation id!")
+ mbox = _WaitableMailbox()
+ cid = self._console._add_mailbox(mbox)
_map = {SchemaMethod.KEY_NAME:name}
if _in_args:
@@ -462,10 +367,10 @@ class Agent(object):
logging.debug("Sending method req to Agent (%s)" % time.time())
try:
- self._send_method_req(_map, handle)
+ self._send_method_req(_map, cid)
except SendError, e:
logging.error(str(e))
- self._console._req_correlation.release(handle)
+ self._console._remove_mailbox(cid)
return None
# @todo async method calls!!!
@@ -473,8 +378,9 @@ class Agent(object):
print("ASYNC TBD")
logging.debug("Waiting for response to method req (%s)" % _timeout)
- replyMsg = self._console._req_correlation.get_data(handle, _timeout)
- self._console._req_correlation.release(handle)
+ replyMsg = mbox.fetch(_timeout)
+ self._console._remove_mailbox(cid)
+
if not replyMsg:
logging.debug("Agent method req wait timed-out.")
return None
@@ -591,7 +497,6 @@ class Console(Thread):
self._announce_recvr = None
self._locate_sender = None
self._schema_cache = {}
- self._req_correlation = SequencedWaiter()
self._agent_discovery_filter = None
self._reply_timeout = reply_timeout
self._agent_timeout = agent_timeout
@@ -601,6 +506,10 @@ class Console(Thread):
# for passing WorkItems to the application
self._work_q = Queue.Queue()
self._work_q_put = False
+ # Correlation ID and mailbox storage
+ self._correlation_id = long(time.time()) # pseudo-randomize
+ self._post_office = {} # indexed by cid
+
## Old stuff below???
#self._broker_list = []
#self.impl = qmfengine.Console()
@@ -763,9 +672,8 @@ class Console(Thread):
# agent not present yet - ping it with an agent_locate
- handle = self._req_correlation.allocate()
- if handle == 0:
- raise Exception("Can not allocate a correlation id!")
+ mbox = _WaitableMailbox()
+ cid = self._add_mailbox(mbox)
query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
msg = Message(subject="console.ind.locate." + name,
@@ -773,7 +681,7 @@ class Console(Thread):
"qmf.subject":make_subject(OpCode.agent_locate)},
content={MsgKey.query: query.map_encode()})
msg.reply_to = str(self._address)
- msg.correlation_id = str(handle)
+ msg.correlation_id = str(cid)
logging.debug("Sending Agent Locate (%s)" % time.time())
# TRACE
#logging.error("!!! Console %s sending agent locate (%s)" %
@@ -782,7 +690,7 @@ class Console(Thread):
self._topic_sender.send(msg)
except SendError, e:
logging.error(str(e))
- self._req_correlation.release(handle)
+ self._remove_mailbox(cid)
return None
if timeout is None:
@@ -790,14 +698,15 @@ class Console(Thread):
new_agent = None
logging.debug("Waiting for response to Agent Locate (%s)" % timeout)
- self._req_correlation.get_data( handle, timeout )
- self._req_correlation.release(handle)
+ mbox.fetch(timeout)
+ self._remove_mailbox(cid)
logging.debug("Agent Locate wait ended (%s)" % time.time())
self._lock.acquire()
try:
new_agent = self._agent_map.get(name)
finally:
self._lock.release()
+
return new_agent
@@ -828,82 +737,96 @@ class Console(Thread):
def do_query(self, agent, query, timeout=None ):
"""
"""
+ query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
+ QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
+ QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
+ QmfQuery.TARGET_SCHEMA: MsgKey.schema,
+ QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
+ QmfQuery.TARGET_AGENT: MsgKey.agent_info}
target = query.get_target()
- handle = self._req_correlation.allocate()
- if handle == 0:
- raise Exception("Can not allocate a correlation id!")
+ msgkey = query_keymap.get(target)
+ if not msgkey:
+ raise Exception("Invalid target for query: %s" % str(query))
+
+ mbox = _WaitableMailbox()
+ cid = self._add_mailbox(mbox)
+
try:
logging.debug("Sending Query to Agent (%s)" % time.time())
- agent._send_query(query, handle)
+ agent._send_query(query, cid)
except SendError, e:
logging.error(str(e))
- self._req_correlation.release(handle)
+ self._remove_mailbox(cid)
return None
if not timeout:
timeout = self._reply_timeout
logging.debug("Waiting for response to Query (%s)" % timeout)
- reply = self._req_correlation.get_data(handle, timeout)
- self._req_correlation.release(handle)
- if not reply:
- logging.debug("Agent Query wait timed-out.")
- return None
+ now = datetime.datetime.utcnow()
+ expire = now + datetime.timedelta(seconds=timeout)
+
+ response = []
+ while (expire > now):
+ timeout = timedelta_to_secs(expire - now)
+ reply = mbox.fetch(timeout)
+ if not reply:
+ logging.debug("Query wait timed-out.")
+ break
+
+ objects = reply.content.get(msgkey)
+ if not objects:
+ # last response is empty
+ break
+
+ # convert from map to native types if needed
+ if target == QmfQuery.TARGET_SCHEMA_ID:
+ for sid_map in objects:
+ response.append(SchemaClassId.from_map(sid_map))
+
+ elif target == QmfQuery.TARGET_SCHEMA:
+ for schema_map in objects:
+ # extract schema id, convert based on schema type
+ sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
+ if sid_map:
+ sid = SchemaClassId.from_map(sid_map)
+ if sid:
+ if sid.get_type() == SchemaClassId.TYPE_DATA:
+ schema = SchemaObjectClass.from_map(schema_map)
+ else:
+ schema = SchemaEventClass.from_map(schema_map)
+ self._add_schema(schema) # add to schema cache
+ response.append(schema)
+
+ elif target == QmfQuery.TARGET_OBJECT:
+ for obj_map in objects:
+ obj = QmfConsoleData(map_=obj_map, agent=agent)
+ response.append(obj)
+ # @todo prefetch unknown schema
+ # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
+ # if sid_map:
+ # sid = SchemaClassId.from_map(sid_map)
+ # # if the object references a schema, fetch it
+ # # schema = self._fetch_schema(sid, _agent=agent,
+ # # _timeout=timeout)
+ # # if not schema:
+ # # logging.warning("Unknown schema, id=%s" % sid)
+ # # continue
+ # obj = QmfConsoleData(map_=obj_map, agent=agent,
+ # _schema=schema)
+ # else:
+ # # no schema needed
+ else:
+ # no conversion needed.
+ response += objects
+
+ now = datetime.datetime.utcnow()
+
+ self._remove_mailbox(cid)
+ return response
+
- if target == QmfQuery.TARGET_PACKAGES:
- # simply pass back the list of package names
- logging.debug("Response to Packet Query received")
- return reply.content.get(MsgKey.package_info)
- elif target == QmfQuery.TARGET_OBJECT_ID:
- # simply pass back the list of object_id's
- logging.debug("Response to Object Id Query received")
- return reply.content.get(MsgKey.object_id)
- elif target == QmfQuery.TARGET_SCHEMA_ID:
- logging.debug("Response to Schema Id Query received")
- id_list = []
- for sid_map in reply.content.get(MsgKey.schema_id):
- id_list.append(SchemaClassId.from_map(sid_map))
- return id_list
- elif target == QmfQuery.TARGET_SCHEMA:
- logging.debug("Response to Schema Query received")
- schema_list = []
- for schema_map in reply.content.get(MsgKey.schema):
- # extract schema id, convert based on schema type
- sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
- if sid_map:
- sid = SchemaClassId.from_map(sid_map)
- if sid:
- if sid.get_type() == SchemaClassId.TYPE_DATA:
- schema = SchemaObjectClass.from_map(schema_map)
- else:
- schema = SchemaEventClass.from_map(schema_map)
- schema_list.append(schema)
- self._add_schema(schema)
- return schema_list
- elif target == QmfQuery.TARGET_OBJECT:
- logging.debug("Response to Object Query received")
- obj_list = []
- for obj_map in reply.content.get(MsgKey.data_obj):
- obj = QmfConsoleData(map_=obj_map, agent=agent)
- obj_list.append(obj)
- # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
- # if sid_map:
- # sid = SchemaClassId.from_map(sid_map)
- # # if the object references a schema, fetch it
- # # schema = self._fetch_schema(sid, _agent=agent,
- # # _timeout=timeout)
- # # if not schema:
- # # logging.warning("Unknown schema, id=%s" % sid)
- # # continue
- # obj = QmfConsoleData(map_=obj_map, agent=agent,
- # _schema=schema)
- # else:
- # # no schema needed
- return obj_list
- else:
- logging.warning("Unexpected Target for a Query: '%s'" % target)
- return None
def run(self):
global _callback_thread
@@ -1115,7 +1038,8 @@ class Console(Thread):
correlated = False
if msg.correlation_id:
- correlated = self._req_correlation.is_valid(msg.correlation_id)
+ mbox = self._get_mailbox(msg.correlation_id)
+ correlated = mbox is not None
agent = None
self._lock.acquire()
@@ -1150,7 +1074,7 @@ class Console(Thread):
if correlated:
# wake up all waiters
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
- self._req_correlation.put_data(msg.correlation_id, msg)
+ mbox.deliver(msg)
def _handle_data_ind_msg(self, msg, cmap, version, direct):
"""
@@ -1158,14 +1082,15 @@ class Console(Thread):
"""
logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time()))
- if not self._req_correlation.is_valid(msg.correlation_id):
+ mbox = self._get_mailbox(msg.correlation_id)
+ if not mbox:
logging.debug("Data indicate received with unknown correlation_id"
" msg='%s'" % str(msg))
return
# wake up all waiters
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
- self._req_correlation.put_data(msg.correlation_id, msg)
+ mbox.deliver(msg)
def _handle_response_msg(self, msg, cmap, version, direct):
@@ -1175,14 +1100,15 @@ class Console(Thread):
# @todo code replication - clean me.
logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
- if not self._req_correlation.is_valid(msg.correlation_id):
+ mbox = self._get_mailbox(msg.correlation_id)
+ if not mbox:
logging.debug("Response msg received with unknown correlation_id"
" msg='%s'" % str(msg))
return
# wake up all waiters
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
- self._req_correlation.put_data(msg.correlation_id, msg)
+ mbox.deliver(msg)
def _handle_event_ind_msg(self, msg, cmap, version, _direct):
ei_map = cmap.get(MsgKey.event)
@@ -1393,6 +1319,46 @@ class Console(Thread):
else:
return None
+ def _add_mailbox(self, mbox):
+ """ Add a mailbox to the post office, return a unique identifier """
+ cid = 0
+ self._lock.acquire()
+ try:
+ cid = self._correlation_id
+ self._correlation_id += 1
+ self._post_office[cid] = mbox
+ finally:
+ self._lock.release()
+ return cid
+
+ def _get_mailbox(self, mid):
+ try:
+ mid = long(mid)
+ except TypeError:
+ logging.error("Invalid mailbox id: %s" % str(mid))
+ return None
+
+ self._lock.acquire()
+ try:
+ return self._post_office.get(mid)
+ finally:
+ self._lock.release()
+
+
+ def _remove_mailbox(self, mid):
+ """ Remove a mailbox and its address from the post office """
+ try:
+ mid = long(mid)
+ except TypeError:
+ logging.error("Invalid mailbox id: %s" % str(mid))
+ return None
+
+ self._lock.acquire()
+ try:
+ del self._post_office[mid]
+ finally:
+ self._lock.release()
+
def __repr__(self):
return str(self._address)