summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-09-23 21:18:03 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-09-23 21:18:03 +0000
commit5f4a31f34404ac321414dd8d02f5d1b90213b6d9 (patch)
treeb7f53b42471341dda6fe32c0f0cd0abd5fbc4898
parentab25175c11c7d0e9bfa46e004d8617cb3f7a3ab7 (diff)
downloadqpid-python-5f4a31f34404ac321414dd8d02f5d1b90213b6d9.tar.gz
QPID-2883: store events until schema response arrives.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1000629 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py54
1 files changed, 50 insertions, 4 deletions
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index 2cda391370..15a58eddd9 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -1720,6 +1720,12 @@ class ClassKey:
else:
self.type = ClassKey.TYPE_DATA
+ def __hash__(self):
+ ss = self.pname + self.cname + self.getHashString()
+ return ss.__hash__()
+
+ def __eq__(self, other):
+ return self.__repr__() == other.__repr__()
#===================================================================================================
# SchemaClass
@@ -3212,8 +3218,8 @@ class Agent:
context.doEvent(event)
else:
# schema not optional and not present
- self._v2SendSchemaRequest(event.classKey)
- # @todo: event dropped - fix this!
+ if context.addPendingEvent(event):
+ self._v2SendSchemaRequest(event.classKey)
elif kind == "_schema_id":
for sid in content:
@@ -3448,6 +3454,7 @@ class RequestContext(object):
self.startTime = time()
self.rawQueryResults = []
self.queryResults = []
+ self.pendingEvents = {}
self.exception = None
self.waitingForSchema = None
self.pendingSignal = None
@@ -3493,6 +3500,45 @@ class RequestContext(object):
return
self.rawQueryResults.append(data)
+ def addPendingEvent(self, event):
+ """ Stores a received event that is pending a schema. Returns True if this
+ event is the first instance of a given schema identifier.
+ """
+ self.cv.acquire()
+ try:
+ if event.classKey in self.pendingEvents:
+ self.pendingEvents[event.classKey].append((event, time()))
+ return False
+ self.pendingEvents[event.classKey] = [(event, time())]
+ return True
+ finally:
+ self.cv.release()
+
+ def processPendingEvents(self):
+ """ Walk the pending events looking for schemas that are now
+ available. Remove any events that now have schema, and process them.
+ """
+ keysToDelete = []
+ events = []
+ self.cv.acquire()
+ try:
+ for key in self.pendingEvents.iterkeys():
+ schema = self.schemaCache.getSchema(key)
+ if schema:
+ keysToDelete.append(key)
+ for item in self.pendingEvents[key]:
+ # item is (timestamp, event-obj) tuple.
+ # hack: I have no idea what a valid lifetime for an event
+ # should be. 60 seconds???
+ if (time() - item[1]) < 60:
+ item[0].schema = schema
+ events.append(item[0])
+ for key in keysToDelete:
+ self.pendingEvents.pop(key)
+ finally:
+ self.cv.release()
+ for event in events:
+ self.doEvent(event)
def doEvent(self, data):
if self.notifiable:
@@ -3610,7 +3656,7 @@ class RequestContext(object):
def reprocess(self):
"""
New schema information has been added to the schema-cache. Clear our 'waiting' status
- and see if we can make more progress on the raw query list.
+ and see if we can make more progress on any pending inbound events/objects.
"""
try:
self.cv.acquire()
@@ -3618,7 +3664,7 @@ class RequestContext(object):
finally:
self.cv.release()
self.processV2Data()
-
+ self.processPendingEvents()
def _getSchemaIdforV2ObjectLH(self, data):
"""