summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-03-26 23:07:18 +0000
committerTed Ross <tross@apache.org>2010-03-26 23:07:18 +0000
commitbb582d2e1c91e973ac8e419b497bf165d9530f29 (patch)
tree47fe7c6be3ddcd8b93397e23dd0aa349832daec7
parentc614014627bec840f754b964a42c01cf140cb611 (diff)
downloadqpid-python-bb582d2e1c91e973ac8e419b497bf165d9530f29.tar.gz
For qmf-gen:
- Added conditional generation of QMFv1 code (for broker and plugins only) - Added nesting IF/ENDIF capability to support above For python console: - Added support for V2 and V1 get queries - Handle both possible results of race between v2 data and the schema for the data git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@928096 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/management/ManagementObject.h19
-rwxr-xr-xqpid/cpp/managementgen/qmf-gen2
-rwxr-xr-xqpid/cpp/managementgen/qmfgen/generate.py41
-rw-r--r--qpid/cpp/managementgen/qmfgen/templates/Class.cpp4
-rw-r--r--qpid/cpp/managementgen/qmfgen/templates/Class.h2
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py189
6 files changed, 200 insertions, 57 deletions
diff --git a/qpid/cpp/include/qpid/management/ManagementObject.h b/qpid/cpp/include/qpid/management/ManagementObject.h
index 98ef886cd1..07d039b6e3 100644
--- a/qpid/cpp/include/qpid/management/ManagementObject.h
+++ b/qpid/cpp/include/qpid/management/ManagementObject.h
@@ -178,14 +178,17 @@ protected:
virtual void doMethod(std::string& methodName,
const messaging::VariantMap& inMap,
messaging::VariantMap& outMap) = 0;
- virtual uint32_t writePropertiesSize() const = 0;
- virtual void readProperties(const std::string& buf) = 0;
- virtual void writeProperties(std::string& buf) const = 0;
- virtual void writeStatistics(std::string& buf,
- bool skipHeaders = false) = 0;
- virtual void doMethod(std::string& methodName,
- const std::string& inBuf,
- std::string& outBuf) = 0;
+
+ /**
+ * The following five methods are not pure-virtual because they will only
+ * be overridden in cases where QMFv1 is to be supported.
+ */
+ virtual uint32_t writePropertiesSize() const { return 0; }
+ virtual void readProperties(const std::string&) {}
+ virtual void writeProperties(std::string&) const {}
+ virtual void writeStatistics(std::string&, bool = false) {}
+ virtual void doMethod(std::string&, const std::string&, std::string&) {}
+
QPID_COMMON_EXTERN virtual void setReference(ObjectId objectId);
virtual std::string& getClassName() const = 0;
diff --git a/qpid/cpp/managementgen/qmf-gen b/qpid/cpp/managementgen/qmf-gen
index ebc07137ae..667aa1ba2d 100755
--- a/qpid/cpp/managementgen/qmf-gen
+++ b/qpid/cpp/managementgen/qmf-gen
@@ -62,8 +62,10 @@ if len(args) == 0:
vargs = {}
if opts.brokerplugin:
vargs["agentHeaderDir"] = "management"
+ vargs["genQmfV1"] = True
else:
vargs["agentHeaderDir"] = "agent"
+ vargs["genQmfV1"] = None
for schemafile in args:
package = SchemaPackage(typefile, schemafile, opts)
diff --git a/qpid/cpp/managementgen/qmfgen/generate.py b/qpid/cpp/managementgen/qmfgen/generate.py
index 4052b8c853..8a00b69761 100755
--- a/qpid/cpp/managementgen/qmfgen/generate.py
+++ b/qpid/cpp/managementgen/qmfgen/generate.py
@@ -38,39 +38,43 @@ class Template:
self.filename = filename
self.handler = handler
self.handler.initExpansion ()
- self.writing = True
+ self.writing = 0 # 0 => write output lines; >0 => recursive depth of conditional regions
def expandLine (self, line, stream, object):
cursor = 0
while 1:
sub = line.find ("/*MGEN:", cursor)
if sub == -1:
- if self.writing:
+ if self.writing == 0:
stream.write (line[cursor:len (line)])
return
subend = line.find("*/", sub)
- if self.writing:
+ if self.writing == 0:
stream.write (line[cursor:sub])
cursor = subend + 2
tag = line[sub:subend]
if tag[7:10] == "IF(":
- close = tag.find(")")
- if close == -1:
- raise ValueError ("Missing ')' on condition")
- cond = tag[10:close]
- dotPos = cond.find (".")
- if dotPos == -1:
- raise ValueError ("Invalid condition tag: %s" % cond)
- tagObject = cond[0:dotPos]
- tagName = cond[dotPos + 1 : len(cond)]
- if not self.handler.testCondition(object, tagObject, tagName):
- self.writing = False
+ if self.writing == 0:
+ close = tag.find(")")
+ if close == -1:
+ raise ValueError ("Missing ')' on condition")
+ cond = tag[10:close]
+ dotPos = cond.find (".")
+ if dotPos == -1:
+ raise ValueError ("Invalid condition tag: %s" % cond)
+ tagObject = cond[0:dotPos]
+ tagName = cond[dotPos + 1 : len(cond)]
+ if not self.handler.testCondition(object, tagObject, tagName):
+ self.writing += 1
+ else:
+ self.writing += 1
elif tag[7:12] == "ENDIF":
- self.writing = True
+ if self.writing > 0:
+ self.writing -= 1
else:
equalPos = tag.find ("=")
@@ -80,12 +84,12 @@ class Template:
raise ValueError ("Invalid tag: %s" % tag)
tagObject = tag[7:dotPos]
tagName = tag[dotPos + 1:len (tag)]
- if self.writing:
+ if self.writing == 0:
self.handler.substHandler (object, stream, tagObject, tagName)
else:
tagKey = tag[7:equalPos]
tagVal = tag[equalPos + 1:len (tag)]
- if self.writing:
+ if self.writing == 0:
self.handler.setVariable (tagKey, tagVal)
def expand (self, object):
@@ -297,6 +301,9 @@ class Generator:
self.packagelist.append(path)
self.packagePath = self.normalize(self.dest + path)
+ def testGenQMFv1 (self, variables):
+ return variables["genQmfV1"]
+
def genDisclaimer (self, stream, variables):
prefix = variables["commentPrefix"]
stream.write (prefix + " This source file was created by a code generator.\n")
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp
index 0302e5eba2..d1c7b0620c 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp
+++ b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp
@@ -133,7 +133,7 @@ void /*MGEN:Class.NameCap*/::aggregatePerThreadStats(struct PerThreadStats* tota
}
/*MGEN:ENDIF*/
-
+/*MGEN:IF(Root.GenQMFv1)*/
uint32_t /*MGEN:Class.NameCap*/::writePropertiesSize() const
{
uint32_t size = writeTimestampsSize();
@@ -263,7 +263,7 @@ void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/)
outBuf.getRawData(outStr, _bufLen);
}
-
+/*MGEN:ENDIF*/
std::string /*MGEN:Class.NameCap*/::getKey() const
{
std::stringstream key;
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h
index 026b1d79eb..81e33a01f9 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Class.h
+++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h
@@ -83,6 +83,7 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject
const ::qpid::messaging::VariantMap& inMap,
::qpid::messaging::VariantMap& outMap);
std::string getKey() const;
+/*MGEN:IF(Root.GenQMFv1)*/
uint32_t writePropertiesSize() const;
void readProperties(const std::string& buf);
void writeProperties(std::string& buf) const;
@@ -90,6 +91,7 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject
void doMethod(std::string& methodName,
const std::string& inBuf,
std::string& outBuf);
+/*MGEN:ENDIF*/
writeSchemaCall_t getWriteSchemaCall() { return writeSchema; }
/*MGEN:IF(Class.NoStatistics)*/
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index 8d12577deb..1f88cdb70a 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -162,18 +162,18 @@ class Object(object):
if property.name in notPresent:
self._properties.append((property, None))
else:
- self._properties.append((property, self._session._decodeValue(codec, property.type, broker)))
+ self._properties.append((property, self._session._decodeValue(codec, property.type, self._broker)))
if stat:
for statistic in schema.getStatistics():
- self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, broker)))
+ self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, self._broker)))
else:
for property in schema.getProperties():
if property.optional:
self._properties.append((property, None))
else:
- self._properties.append((property, self._session._defaultValue(property, broker, kwargs)))
+ self._properties.append((property, self._session._defaultValue(property, self._broker, kwargs)))
for statistic in schema.getStatistics():
- self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs)))
+ self._statistics.append((statistic, self._session._defaultValue(statistic, self._broker, kwargs)))
def v2Init(self, omap, agentName):
if omap.__class__ != dict:
@@ -828,7 +828,7 @@ class Session:
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
- def _handleCommandComplete(self, broker, codec, seq):
+ def _handleCommandComplete(self, broker, codec, seq, agent):
code = codec.read_uint32()
text = codec.read_str8()
context = self.seqMgr._release(seq)
@@ -850,6 +850,10 @@ class Session:
finally:
self.cv.release()
+ if agent:
+ agent._handleV1Completion(seq, code, text)
+
+
def _handleClassInd(self, broker, codec, seq):
kind = codec.read_uint8()
classKey = ClassKey(codec)
@@ -1714,6 +1718,7 @@ class Broker:
self.authUser = authUser
self.authPass = authPass
self.cv = Condition()
+ self.seqToAgentMap = {}
self.error = None
self.brokerId = None
self.connected = False
@@ -1792,6 +1797,20 @@ class Broker:
else:
return "Disconnected Broker"
+ def _setSequence(self, sequence, agent):
+ try:
+ self.cv.acquire()
+ self.seqToAgentMap[sequence] = agent
+ finally:
+ self.cv.release()
+
+ def _clearSequence(self, sequence):
+ try:
+ self.cv.acquire()
+ self.seqToAgentMap.pop(sequence)
+ finally:
+ self.cv.release()
+
def _tryToConnect(self):
try:
try:
@@ -2071,17 +2090,28 @@ class Broker:
agent = self.agents[agent_addr]
codec = Codec(msg.body)
+ alreadyTried = None
while True:
opcode, seq = self._checkHeader(codec)
+
+ if not agent and not alreadyTried:
+ alreadyTried = True
+ try:
+ self.cv.acquire()
+ if seq in self.seqToAgentMap:
+ agent = self.seqToAgentMap[seq]
+ finally:
+ self.cv.release()
+
if opcode == None: return
if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr)
elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg)
- elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
+ elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent)
elif agent:
- agent._handleQmfV1Message(opcode, mp, ah, codec)
+ agent._handleQmfV1Message(opcode, seq, mp, ah, codec)
self.amqpSession.receiver._completed.add(msg.id)
self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
@@ -2261,6 +2291,7 @@ class Agent:
try:
self.lock.acquire()
self.contextMap[sequence] = context
+ context.setSequence(sequence)
finally:
self.lock.release()
@@ -2271,6 +2302,7 @@ class Agent:
if self.isV2:
self._v2SendGetQuery(sequence, kwargs)
else:
+ self.broker._setSequence(sequence, self)
self._v1SendGetQuery(sequence, kwargs)
#
@@ -2284,10 +2316,17 @@ class Agent:
if context.exception:
raise Exception(context.exception)
result = context.queryResults
- self.contextMap.pop(sequence)
return result
+ def _clearContext(self, sequence):
+ try:
+ self.lock.acquire()
+ self.contextMap.pop(sequence)
+ finally:
+ self.lock.release()
+
+
def _schemaInfoFromV2Agent(self):
"""
We have just received new schema information from this agent. Check to see if there's
@@ -2295,7 +2334,9 @@ class Agent:
"""
try:
self.lock.acquire()
- copy_of_map = self.contextMap
+ copy_of_map = {}
+ for item in self.contextMap:
+ copy_of_map[item] = self.contextMap[item]
finally:
self.lock.release()
@@ -2304,6 +2345,26 @@ class Agent:
copy_of_map[context].reprocess()
+ def _handleV1Completion(self, sequence, code, text):
+ """
+ Called if one of this agent's V1 commands completed
+ """
+ context = None
+ try:
+ self.lock.acquire()
+ if sequence in self.contextMap:
+ context = self.contextMap[sequence]
+ finally:
+ self.lock.release()
+
+ if context:
+ if code != 0:
+ ex = "Error %d: %s" % (code, text)
+ context.setException(ex)
+ context.signal()
+ self.broker._clearSequence(sequence)
+
+
def _v1HandleMethodResp(self, codec, seq):
"""
Handle a QMFv1 method response
@@ -2342,7 +2403,7 @@ class Agent:
self.console.event(broker, event)
- def _v1HandleContentInd(self, broker, codec, seq, prop=False, stat=False):
+ def _v1HandleContentInd(self, codec, sequence, prop=False, stat=False):
"""
Handle a QMFv1 content indication
"""
@@ -2351,24 +2412,20 @@ class Agent:
if not schema:
return
- obj = Object(self, broker, schema, codec, prop, stat)
+ obj = Object(self, schema, codec, prop, stat)
if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
- broker._updateAgent(obj)
+ self.broker._updateAgent(obj)
try:
self.lock.acquire()
- if seq in self.syncSequenceList:
- if object.getTimestamps()[2] == 0 and self._selectMatch(object):
- self.getResult.append(object)
- return
+ if sequence in self.contextMap:
+ context = self.contextMap[sequence]
finally:
self.lock.release()
- if self.console and self.rcvObjects:
- if prop:
- self.console.objectProps(broker, object)
- if stat:
- self.console.objectStats(broker, object)
+ if not context:
+ context = self.unsolicitedContext
+ context.addV1QueryResult(obj)
def _v2HandleDataInd(self, mp, ah, content):
@@ -2376,10 +2433,14 @@ class Agent:
Handle a QMFv2 data indication from the agent
"""
if mp.correlation_id:
- sequence = int(mp.correlation_id)
- if sequence not in self.contextMap:
- return
- context = self.contextMap[sequence]
+ try:
+ self.lock.acquire()
+ sequence = int(mp.correlation_id)
+ if sequence not in self.contextMap:
+ return
+ context = self.contextMap[sequence]
+ finally:
+ self.lock.release()
else:
context = self.unsolicitedContext
@@ -2405,8 +2466,33 @@ class Agent:
pass
- def _v1SendGetQuery(self, kwargs):
- pass
+ def _v1SendGetQuery(self, sequence, kwargs):
+ """
+ Send a get query to a QMFv1 agent.
+ """
+ #
+ # Build the query map
+ #
+ query = {}
+ if '_class' in kwargs:
+ query['_class'] = kwargs['_class']
+ if '_package' in kwargs:
+ query['_package'] = kwargs['_package']
+ elif '_key' in kwargs:
+ key = kwargs['_key']
+ query['_class'] = key.getClassName()
+ query['_package'] = key.getPackageName()
+ elif '_objectId' in kwargs:
+ query['_objectid'] = kwargs['_objectId'].__repr__()
+
+ #
+ # Construct and transmit the message
+ #
+ sendCodec = Codec()
+ self.broker._setHeader(sendCodec, 'G', sequence)
+ sendCodec.write_map(query)
+ smsg = self.broker._message(sendCodec.encoded, "agent.%d.%s" % (self.brokerBank, self.agentBank))
+ self.broker._send(smsg)
def _v2SendGetQuery(self, sequence, kwargs):
@@ -2460,7 +2546,7 @@ class Agent:
self.broker._send(smsg, "qmf.default.direct")
- def _handleQmfV1Message(self, opcode, mp, ah, codec):
+ def _handleQmfV1Message(self, opcode, seq, mp, ah, codec):
"""
Process QMFv1 messages arriving from an agent.
"""
@@ -2487,8 +2573,10 @@ class Agent:
class RequestContext(object):
"""
This class tracks an asynchronous request sent to an agent.
+ TODO: Add logic for client-side selection and filtering deleted objects from get-queries
"""
def __init__(self, agent, notifiable):
+ self.sequence = None
self.agent = agent
self.schemaCache = self.agent.schemaCache
self.notifiable = notifiable
@@ -2497,10 +2585,22 @@ class RequestContext(object):
self.queryResults = []
self.exception = None
self.waitingForSchema = None
+ self.pendingSignal = None
self.cv = Condition()
self.blocked = notifiable == None
+ def setSequence(self, sequence):
+ self.sequence = sequence
+
+
+ def addV1QueryResult(self, data):
+ if self.notifiable:
+ self.notifyable(qmf_object=data)
+ else:
+ self.queryResults.append(data)
+
+
def addV2QueryResult(self, data):
self.rawQueryResults.append(data)
@@ -2528,10 +2628,26 @@ class RequestContext(object):
def signal(self):
try:
self.cv.acquire()
- self.blocked = None
- self.cv.notify()
+ if self.waitingForSchema:
+ self.pendingSignal = True
+ return
+ else:
+ self.blocked = None
+ self.cv.notify()
finally:
self.cv.release()
+ self._complete()
+
+
+ def _complete(self):
+ if self.notifiable:
+ if self.exception:
+ self.notifiable(qmf_exception=self.exception)
+ else:
+ self.notifiable(qmf_complete=True)
+
+ if self.sequence:
+ self.agent._clearContext(self.sequence)
def processV2Data(self):
@@ -2568,6 +2684,19 @@ class RequestContext(object):
else:
self.queryResults.append(result)
+ complete = None
+ try:
+ self.cv.acquire()
+ if not self.waitingForSchema and self.pendingSignal:
+ self.blocked = None
+ self.cv.notify()
+ complete = True
+ finally:
+ self.cv.release()
+
+ if complete:
+ self._complete()
+
def reprocess(self):
"""