diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-09-23 21:18:03 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-09-23 21:18:03 +0000 |
commit | e0c0f4831a07413bed73c936eb6711e937ecbab2 (patch) | |
tree | 73bea20cfd064d8a08af29da693e76a714f2bdb3 | |
parent | b73e64ab2581a04f26faa9ea0f2f57ebdd3d9287 (diff) | |
download | qpid-python-e0c0f4831a07413bed73c936eb6711e937ecbab2.tar.gz |
QPID-2883: store events until schema response arrives.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1000629 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | extras/qmf/src/py/qmf/console.py | 54 |
1 files changed, 50 insertions, 4 deletions
diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py index 2cda391370..15a58eddd9 100644 --- a/extras/qmf/src/py/qmf/console.py +++ b/extras/qmf/src/py/qmf/console.py @@ -1720,6 +1720,12 @@ class ClassKey: else: self.type = ClassKey.TYPE_DATA + def __hash__(self): + ss = self.pname + self.cname + self.getHashString() + return ss.__hash__() + + def __eq__(self, other): + return self.__repr__() == other.__repr__() #=================================================================================================== # SchemaClass @@ -3212,8 +3218,8 @@ class Agent: context.doEvent(event) else: # schema not optional and not present - self._v2SendSchemaRequest(event.classKey) - # @todo: event dropped - fix this! + if context.addPendingEvent(event): + self._v2SendSchemaRequest(event.classKey) elif kind == "_schema_id": for sid in content: @@ -3448,6 +3454,7 @@ class RequestContext(object): self.startTime = time() self.rawQueryResults = [] self.queryResults = [] + self.pendingEvents = {} self.exception = None self.waitingForSchema = None self.pendingSignal = None @@ -3493,6 +3500,45 @@ class RequestContext(object): return self.rawQueryResults.append(data) + def addPendingEvent(self, event): + """ Stores a received event that is pending a schema. Returns True if this + event is the first instance of a given schema identifier. + """ + self.cv.acquire() + try: + if event.classKey in self.pendingEvents: + self.pendingEvents[event.classKey].append((event, time())) + return False + self.pendingEvents[event.classKey] = [(event, time())] + return True + finally: + self.cv.release() + + def processPendingEvents(self): + """ Walk the pending events looking for schemas that are now + available. Remove any events that now have schema, and process them. + """ + keysToDelete = [] + events = [] + self.cv.acquire() + try: + for key in self.pendingEvents.iterkeys(): + schema = self.schemaCache.getSchema(key) + if schema: + keysToDelete.append(key) + for item in self.pendingEvents[key]: + # item is (timestamp, event-obj) tuple. + # hack: I have no idea what a valid lifetime for an event + # should be. 60 seconds??? + if (time() - item[1]) < 60: + item[0].schema = schema + events.append(item[0]) + for key in keysToDelete: + self.pendingEvents.pop(key) + finally: + self.cv.release() + for event in events: + self.doEvent(event) def doEvent(self, data): if self.notifiable: @@ -3610,7 +3656,7 @@ class RequestContext(object): def reprocess(self): """ New schema information has been added to the schema-cache. Clear our 'waiting' status - and see if we can make more progress on the raw query list. + and see if we can make more progress on any pending inbound events/objects. """ try: self.cv.acquire() @@ -3618,7 +3664,7 @@ class RequestContext(object): finally: self.cv.release() self.processV2Data() - + self.processPendingEvents() def _getSchemaIdforV2ObjectLH(self, data): """ |