From 555230c7a5fb9d5c9af0978f5400613e2533e68c Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Tue, 30 Mar 2010 12:18:21 +0000 Subject: Methods (both styles) and Session-scope get queries now working for both V1 and V2. Almost all of the tests pass. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@929104 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/managementgen/qmfgen/schema.py | 2 +- qpid/cpp/managementgen/qmfgen/templates/Class.cpp | 2 +- qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 33 +- qpid/cpp/src/qpid/management/ManagementObject.cpp | 5 - qpid/extras/qmf/src/py/qmf/console.py | 399 +++++++++++++++------- 5 files changed, 306 insertions(+), 135 deletions(-) diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py index 30d7ba872d..27216db8e1 100755 --- a/qpid/cpp/managementgen/qmfgen/schema.py +++ b/qpid/cpp/managementgen/qmfgen/schema.py @@ -1402,7 +1402,7 @@ class SchemaClass: stream.write (" status = coreObject->ManagementMethod (METHOD_" +\ method.getName().upper() + ", ioArgs, text);\n") - stream.write (" outMap[\"_status_code\"] = (status);\n") + stream.write (" outMap[\"_status_code\"] = (uint32_t) status;\n") stream.write (" outMap[\"_status_text\"] = ::qpid::management::Manageable::StatusText(status, text);\n") for arg in method.args: if arg.getDir () == "O" or arg.getDir () == "IO": diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp index d1c7b0620c..62b10446dd 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp +++ b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp @@ -333,6 +333,6 @@ void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMapMethodArgs*/) std::string text; /*MGEN:Class.MapMethodHandlers*/ - outMap["_status_code"] = status; + outMap["_status_code"] = (uint32_t) status; outMap["_status_text"] = Manageable::StatusText(status, text); } diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 77e591dd2e..298a549651 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -459,19 +459,21 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& qpid::messaging::Message outMsg; qpid::messaging::MapContent outMap(outMsg); + outMap["_values"] = Variant::Map(); if ((oid = inMap.find("_object_id")) == inMap.end() || (mid = inMap.find("_method_name")) == inMap.end()) { - (outMap["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID; + (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_PARAMETER_INVALID; (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID); failed = true; } else { string methodName; ObjectId objId; Variant::Map inArgs; + Variant::Map callMap; try { - // coversions will throw if input is invalid. + // conversions will throw if input is invalid. objId = ObjectId(oid->second.asMap()); methodName = mid->second.getString(); @@ -482,17 +484,29 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { - (outMap["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT; + (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_UNKNOWN_OBJECT; (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT); failed = true; } else { + iter->second->doMethod(methodName, inArgs, callMap); + } - iter->second->doMethod(methodName, inArgs, outMap.asMap()); + if (callMap["_status_code"].asUint32() == 0) { + outMap["_arguments"] = Variant::Map(); + for (Variant::Map::const_iterator iter = callMap.begin(); + iter != callMap.end(); iter++) + if (iter->first != "_status_code" && iter->first != "_status_text") + outMap["_arguments"].asMap()[iter->first] = iter->second; + } else { + (outMap["_values"].asMap())["_status_code"] = callMap["_status_code"]; + (outMap["_values"].asMap())["_status_text"] = callMap["_status_text"]; + failed = true; } - } catch(exception& e) { + } catch(messaging::InvalidConversion& e) { outMap.clear(); - (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; + outMap["_values"] = Variant::Map(); + (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_EXCEPTION; (outMap["_values"].asMap())["_status_text"] = e.what(); failed = true; } @@ -501,10 +515,13 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& Variant::Map headers; headers["method"] = "response"; headers["qmf.agent"] = name_address; - if (failed) + if (failed) { headers["qmf.opcode"] = "_exception"; - else + QPID_LOG(trace, "SENT Exception map=" << outMap); + } else { headers["qmf.opcode"] = "_method_response"; + QPID_LOG(trace, "SENT MethodResponse map=" << outMap); + } outMap.encode(); connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index 183763a417..ee40ba9594 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -188,11 +188,6 @@ void ObjectId::setV2Key(const ManagementObject& object) // encode as V2-format map void ObjectId::mapEncode(messaging::VariantMap& map) const { - if (agent == 0) - map["_first"] = first; - else - map["_first"] = (first | agent->first); - map["_object_name"] = v2Key; if (!agentName.empty()) map["_agent_name"] = agentName; diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index 526eb49d24..e598068262 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -287,7 +287,7 @@ class Object(object): if name == prop.name: return value if name == "_" + prop.name + "_" and prop.type == 10: # Dereference references - deref = self._session.getObjects(_objectId=value, _broker=self._broker) + deref = self._agent.getObjects(_objectId=value) if len(deref) != 1: return None else: @@ -321,10 +321,6 @@ class Object(object): aIdx = 0 sendCodec = Codec() seq = self._session.seqMgr._reserve((method, synchronous)) - self._broker._setHeader(sendCodec, 'M', seq) - self._objectId.encode(sendCodec) - self._schema.getKey().encode(sendCodec) - sendCodec.write_str8(name) count = 0 for arg in method.arguments: @@ -333,24 +329,64 @@ class Object(object): if count != len(args): raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args))) - for arg in method.arguments: - if arg.dir.find("I") != -1: - self._session._encodeValue(sendCodec, args[aIdx], arg.type) - aIdx += 1 - if timeWait: - ttl = timeWait * 1000 + if self._agent.isV2: + # + # Compose and send a QMFv2 method request + # + call = {} + call['_object_id'] = self._objectId.asMap() + call['_method_name'] = name + argMap = {} + for arg in method.arguments: + if arg.dir.find("I") != -1: + argMap[arg.name] = args[aIdx] + aIdx += 1 + call['_arguments'] = argMap + + dp = self._broker.amqpSession.delivery_properties() + dp.routing_key = self._objectId.getAgentBank() + mp = self._broker.amqpSession.message_properties() + mp.content_type = "amqp/map" + mp.user_id = self._broker.authUser + mp.correlation_id = str(seq) + mp.app_id = "qmf2" + mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_queue_name) + mp.application_headers = {'qmf.opcode':'_method_request'} + sendCodec.write_map(call) + smsg = Message(dp, mp, sendCodec.encoded) + exchange = "qmf.default.direct" + else: - ttl = None - smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" % - (self._objectId.getBrokerBank(), self._objectId.getAgentBank()), - ttl=ttl) + # + # Associate this sequence with the agent hosting the object so we can correctly + # route the method-response + # + agent = self._broker.getAgent(self._broker.getBrokerBank(), self._objectId.getAgentBank()) + self._broker._setSequence(seq, agent) + + # + # Compose and send a QMFv1 method request + # + self._broker._setHeader(sendCodec, 'M', seq) + self._objectId.encode(sendCodec) + self._schema.getKey().encode(sendCodec) + sendCodec.write_str8(name) + + for arg in method.arguments: + if arg.dir.find("I") != -1: + self._session._encodeValue(sendCodec, args[aIdx], arg.type) + aIdx += 1 + smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" % + (self._objectId.getBrokerBank(), self._objectId.getAgentBank())) + exchange = "qpid.management" + if synchronous: try: self._broker.cv.acquire() self._broker.syncInFlight = True finally: self._broker.cv.release() - self._broker._send(smsg) + self._broker._send(smsg, exchange) return seq return None @@ -551,7 +587,9 @@ class Session: self.brokers.append(broker) if not self.manageConnections: - self.getObjects(broker=broker, _class="agent", _agent=broker.getAgent(1,0)) + agent = broker.getAgent(1,0) + if agent: + agent.getObjects(_class="agent") return broker @@ -712,76 +750,17 @@ class Session: if len(agentList) == 0: return [] - pname = None - cname = None - hash = None - classKey = None - if "_schema" in kwargs: classKey = kwargs["_schema"].getKey() - elif "_key" in kwargs: classKey = kwargs["_key"] - elif "_class" in kwargs: - cname = kwargs["_class"] - if "_package" in kwargs: - pname = kwargs["_package"] - if cname == None and classKey == None and "_objectId" not in kwargs: - raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument") - - map = {} - self.getSelect = [] - if "_objectId" in kwargs: - map["_objectid"] = kwargs["_objectId"].__repr__() - else: - if cname == None: - cname = classKey.getClassName() - pname = classKey.getPackageName() - hash = classKey.getHash() - map["_class"] = cname - if pname != None: map["_package"] = pname - if hash != None: map["_hash"] = hash - for item in kwargs: - if item[0] != '_': - self.getSelect.append((item, kwargs[item])) - - self.getResult = [] + # + # We now have a list of agents to query, start the queries and gather the results. + # + request = SessionGetRequest(len(agentList)) for agent in agentList: - broker = agent.broker - sendCodec = Codec() - try: - self.cv.acquire() - seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET) - self.syncSequenceList.append(seq) - finally: - self.cv.release() - broker._setHeader(sendCodec, 'G', seq) - sendCodec.write_map(map) - smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank)) - broker._send(smsg) - - starttime = time() - timeout = False - if "_timeout" in kwargs: - waitTime = kwargs["_timeout"] - else: - waitTime = self.DEFAULT_GET_WAIT_TIME - try: - self.cv.acquire() - while len(self.syncSequenceList) > 0 and self.error == None: - self.cv.wait(waitTime) - if time() - starttime > waitTime: - for pendingSeq in self.syncSequenceList: - self.seqMgr._release(pendingSeq) - self.syncSequenceList = [] - timeout = True - finally: - self.cv.release() - - if self.error: - errorText = self.error - self.error = None - raise Exception(errorText) - - if len(self.getResult) == 0 and timeout: - raise RuntimeError("No agent responded within timeout period") - return self.getResult + agent.getObjects(request, **kwargs) + timeout = 60 + if '_timeout' in kwargs: + timeout = kwargs['_timeout'] + request.wait(timeout) + return request.result def setEventFilter(self, **kwargs): @@ -1179,14 +1158,10 @@ class Session: schema = self.getSchema(schemaKey) for method in schema.getMethods(): if name == method.name: - aIdx = 0 - sendCodec = Codec() - seq = self.seqMgr._reserve((method, False)) - broker._setHeader(sendCodec, 'M', seq) - objectId.encode(sendCodec) - schemaKey.encode(sendCodec) - sendCodec.write_str8(name) - + # + # Count the arguments supplied and validate that the number is what is expected + # based on the schema. + # count = 0 for arg in method.arguments: if arg.dir.find("I") != -1: @@ -1194,17 +1169,105 @@ class Session: if count != len(argList): raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList))) - for arg in method.arguments: - if arg.dir.find("I") != -1: - self._encodeValue(sendCodec, argList[aIdx], arg.type) - aIdx += 1 - smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % - (objectId.getBrokerBank(), objectId.getAgentBank())) - broker._send(smsg) + aIdx = 0 + sendCodec = Codec() + seq = self.seqMgr._reserve((method, False)) + + if objectId.isV2(): + # + # Compose and send a QMFv2 method request + # + call = {} + call['_object_id'] = objectId.asMap() + call['_method_name'] = name + args = {} + for arg in method.arguments: + if arg.dir.find("I") != -1: + args[arg.name] = argList[aIdx] + aIdx += 1 + call['_arguments'] = args + + dp = broker.amqpSession.delivery_properties() + dp.routing_key = objectId.getAgentBank() + mp = broker.amqpSession.message_properties() + mp.content_type = "amqp/map" + mp.user_id = broker.authUser + mp.correlation_id = str(seq) + mp.app_id = "qmf2" + mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_queue_name) + mp.application_headers = {'qmf.opcode':'_method_request'} + sendCodec.write_map(call) + msg = Message(dp, mp, sendCodec.encoded) + broker._send(msg, "qmf.default.direct") + + else: + # + # Associate this sequence with the agent hosting the object so we can correctly + # route the method-response + # + agent = broker.getAgent(broker.getBrokerBank(), objectId.getAgentBank()) + broker._setSequence(seq, agent) + + # + # Compose and send a QMFv1 method request + # + broker._setHeader(sendCodec, 'M', seq) + objectId.encode(sendCodec) + schemaKey.encode(sendCodec) + sendCodec.write_str8(name) + + for arg in method.arguments: + if arg.dir.find("I") != -1: + self._encodeValue(sendCodec, argList[aIdx], arg.type) + aIdx += 1 + smsg = broker._message(sendCodec.encoded, "agent.%d.%s" % + (objectId.getBrokerBank(), objectId.getAgentBank())) + broker._send(smsg) return seq return None +#=================================================================================================== +# SessionGetRequest +#=================================================================================================== +class SessionGetRequest(object): + """ + This class is used to track get-object queries at the Session level. + """ + def __init__(self, agentCount): + self.agentCount = agentCount + self.result = [] + self.cv = Condition() + self.waiting = True + + def __call__(self, **kwargs): + """ + Callable entry point for gathering collected objects. + """ + try: + self.cv.acquire() + if 'qmf_object' in kwargs: + self.result.append(kwargs['qmf_object']) + elif 'qmf_complete' in kwargs or 'qmf_exception' in kwargs: + self.agentCount -= 1 + if self.agentCount == 0: + self.waiting = None + self.cv.notify() + finally: + self.cv.release() + + def wait(self, timeout): + starttime = time() + try: + self.cv.acquire() + while self.waiting: + if (time() - starttime) > timeout: + raise Exception("Timed out after %d seconds" % timeout) + self.cv.wait(1) + finally: + self.cv.release() + + #=================================================================================================== # SchemaCache #=================================================================================================== @@ -1572,7 +1635,7 @@ class ObjectId: else: first = constructor.read_uint64() second = constructor.read_uint64() - self.agentName = str((first & 0x0000FFFFF0000000) >> 28) + self.agentName = str((first & 0x000000000FFFFFFF) >> 28) self.agentEpoch = (first & 0x0FFF000000000000) >> 48 self.objectName = str(second) @@ -1600,6 +1663,9 @@ class ObjectId: return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(), self.getBrokerBank(), self.getAgentBank(), self.getObject()) + def isV2(self): + return not self.agentName.isdigit() + def index(self): return self.__repr__() @@ -1622,11 +1688,11 @@ class ObjectId: return self.getSequence() == 0 def encode(self, codec): - first = self.agentEpoch << 48 + first = (self.agentEpoch << 48) + (1 << 28) second = 0 try: - first += int(self.agentName) << 28 + first += int(self.agentName) except: pass @@ -1787,7 +1853,7 @@ class Broker: def getAgent(self, brokerBank, agentBank): """ Return the agent object associated with a particular broker and agent bank value.""" - bankKey = agentBank + bankKey = str(agentBank) try: self.cv.acquire() if bankKey in self.agents: @@ -1852,7 +1918,7 @@ class Broker: try: self.cv.acquire() self.agents = {} - self.agents[0] = Agent(self, 0, "BrokerAgent") + self.agents['0'] = Agent(self, 0, "BrokerAgent") finally: self.cv.release() @@ -2330,10 +2396,19 @@ class Agent: if not callable(notifiable): raise Exception("notifiable object must be callable") + # + # Isolate the selectors from the kwargs + # + selectors = {} + for key in kwargs: + value = kwargs[key] + if key[0] != '_': + selectors[key] = value + # # Allocate a context to track this asynchronous request. # - context = RequestContext(self, notifiable) + context = RequestContext(self, notifiable, selectors) sequence = self.seqMgr._reserve(context) try: self.lock.acquire() @@ -2419,6 +2494,7 @@ class Agent: code = codec.read_uint32() text = codec.read_str16() outArgs = {} + self.broker._clearSequence(seq) pair = self.seqMgr._release(seq) if pair == None: return @@ -2426,19 +2502,19 @@ class Agent: if code == 0: for arg in method.arguments: if arg.dir.find("O") != -1: - outArgs[arg.name] = self._decodeValue(codec, arg.type, broker) + outArgs[arg.name] = self.session._decodeValue(codec, arg.type, self.broker) result = MethodResult(code, text, outArgs) if synchronous: try: - broker.cv.acquire() - broker.syncResult = result - broker.syncInFlight = False - broker.cv.notify() + self.broker.cv.acquire() + self.broker.syncResult = result + self.broker.syncInFlight = False + self.broker.cv.notify() finally: - broker.cv.release() + self.broker.cv.release() else: - if self.console: - self.console.methodResponse(broker, seq, result) + if self.session.console: + self.session.console.methodResponse(self.broker, seq, result) def _v1HandleEventInd(self, codec, seq): @@ -2502,12 +2578,79 @@ class Agent: context.signal() - def _v2HandleMethodRsp(self, mp, ah, content): - pass + def _v2HandleMethodResp(self, mp, ah, content): + """ + Handle a QMFv2 method response from the agent + """ + context = None + sequence = None + if mp.correlation_id: + try: + self.lock.acquire() + seq = int(mp.correlation_id) + finally: + self.lock.release() + else: + return + + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair + result = MethodResult(0, 'OK', content['_arguments']) + if synchronous: + try: + self.broker.cv.acquire() + self.broker.syncResult = result + self.broker.syncInFlight = False + self.broker.cv.notify() + finally: + self.broker.cv.release() + else: + if self.session.console: + self.session.console.methodResponse(self.broker, seq, result) def _v2HandleException(self, mp, ah, content): - pass + """ + Handle a QMFv2 exception + """ + context = None + if mp.correlation_id: + try: + self.lock.acquire() + seq = int(mp.correlation_id) + finally: + self.lock.release() + else: + return + + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair + + code = 7 + text = "" + if '_status_code' in content: + code = content['_status_code'] + if '_status_text' in content: + text = content['_status_text'] + else: + text = content + + result = MethodResult(code, text, {}) + if synchronous: + try: + self.broker.cv.acquire() + self.broker.syncResult = result + self.broker.syncInFlight = False + self.broker.cv.notify() + finally: + self.broker.cv.release() + else: + if self.session.console: + self.session.console.methodResponse(self.broker, seq, result) def _v1SendGetQuery(self, sequence, kwargs): @@ -2607,7 +2750,7 @@ class Agent: """ if opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content) elif opcode == '_query_response': self._v2HandleDataInd(mp, ah, content) - elif opcode == '_method_response': self._v2HandleMethodRsp(mp, ah, content) + elif opcode == '_method_response': self._v2HandleMethodResp(mp, ah, content) elif opcode == '_exception': self._v2HandleException(mp, ah, content) @@ -2619,11 +2762,12 @@ class RequestContext(object): This class tracks an asynchronous request sent to an agent. TODO: Add logic for client-side selection and filtering deleted objects from get-queries """ - def __init__(self, agent, notifiable): + def __init__(self, agent, notifiable, selectors={}): self.sequence = None self.agent = agent self.schemaCache = self.agent.schemaCache self.notifiable = notifiable + self.selectors = selectors self.startTime = time() self.rawQueryResults = [] self.queryResults = [] @@ -2639,6 +2783,16 @@ class RequestContext(object): def addV1QueryResult(self, data): + values = {} + for prop, val in data.getProperties(): + values[prop.name] = val + for stat, val in data.getStatistics(): + values[stat.name] = val + for key in values: + val = values[key] + if key in self.selectors and val != self.selectors[key]: + return + if self.notifiable: self.notifiable(qmf_object=data) else: @@ -2646,6 +2800,11 @@ class RequestContext(object): def addV2QueryResult(self, data): + values = data['_values'] + for key in values: + val = values[key] + if key in self.selectors and val != self.selectors[key]: + return self.rawQueryResults.append(data) -- cgit v1.2.1