diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-09-23 21:18:03 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-09-23 21:18:03 +0000 |
commit | 5f4a31f34404ac321414dd8d02f5d1b90213b6d9 (patch) | |
tree | b7f53b42471341dda6fe32c0f0cd0abd5fbc4898 | |
parent | ab25175c11c7d0e9bfa46e004d8617cb3f7a3ab7 (diff) | |
download | qpid-python-5f4a31f34404ac321414dd8d02f5d1b90213b6d9.tar.gz |
QPID-2883: store events until schema response arrives.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1000629 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/extras/qmf/src/py/qmf/console.py | 54 |
1 files changed, 50 insertions, 4 deletions
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index 2cda391370..15a58eddd9 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -1720,6 +1720,12 @@ class ClassKey: else: self.type = ClassKey.TYPE_DATA + def __hash__(self): + ss = self.pname + self.cname + self.getHashString() + return ss.__hash__() + + def __eq__(self, other): + return self.__repr__() == other.__repr__() #=================================================================================================== # SchemaClass @@ -3212,8 +3218,8 @@ class Agent: context.doEvent(event) else: # schema not optional and not present - self._v2SendSchemaRequest(event.classKey) - # @todo: event dropped - fix this! + if context.addPendingEvent(event): + self._v2SendSchemaRequest(event.classKey) elif kind == "_schema_id": for sid in content: @@ -3448,6 +3454,7 @@ class RequestContext(object): self.startTime = time() self.rawQueryResults = [] self.queryResults = [] + self.pendingEvents = {} self.exception = None self.waitingForSchema = None self.pendingSignal = None @@ -3493,6 +3500,45 @@ class RequestContext(object): return self.rawQueryResults.append(data) + def addPendingEvent(self, event): + """ Stores a received event that is pending a schema. Returns True if this + event is the first instance of a given schema identifier. + """ + self.cv.acquire() + try: + if event.classKey in self.pendingEvents: + self.pendingEvents[event.classKey].append((event, time())) + return False + self.pendingEvents[event.classKey] = [(event, time())] + return True + finally: + self.cv.release() + + def processPendingEvents(self): + """ Walk the pending events looking for schemas that are now + available. Remove any events that now have schema, and process them. + """ + keysToDelete = [] + events = [] + self.cv.acquire() + try: + for key in self.pendingEvents.iterkeys(): + schema = self.schemaCache.getSchema(key) + if schema: + keysToDelete.append(key) + for item in self.pendingEvents[key]: + # item is (timestamp, event-obj) tuple. + # hack: I have no idea what a valid lifetime for an event + # should be. 60 seconds??? + if (time() - item[1]) < 60: + item[0].schema = schema + events.append(item[0]) + for key in keysToDelete: + self.pendingEvents.pop(key) + finally: + self.cv.release() + for event in events: + self.doEvent(event) def doEvent(self, data): if self.notifiable: @@ -3610,7 +3656,7 @@ class RequestContext(object): def reprocess(self): """ New schema information has been added to the schema-cache. Clear our 'waiting' status - and see if we can make more progress on the raw query list. + and see if we can make more progress on any pending inbound events/objects. """ try: self.cv.acquire() @@ -3618,7 +3664,7 @@ class RequestContext(object): finally: self.cv.release() self.processV2Data() - + self.processPendingEvents() def _getSchemaIdforV2ObjectLH(self, data): """ |