diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-18 21:14:54 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-18 21:14:54 +0000 |
commit | 3f216a28c0d65f703b5ca681689db8a3c08cebaf (patch) | |
tree | 2233987a28e7d1512f8074ead977b27493501894 | |
parent | ee12cdd0aa3e3e1b85e01bc31d51c47b35ef332f (diff) | |
download | qpid-python-3f216a28c0d65f703b5ca681689db8a3c08cebaf.tar.gz |
agent side method request handling
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@924995 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 84 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 153 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 1 |
3 files changed, 188 insertions, 50 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index a99f24c2f1..547d71e165 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -472,60 +472,56 @@ void ManagementAgentImpl::handleConsoleAddedIndication() void ManagementAgentImpl::invokeMethodRequest(const std::string& body, uint32_t sequence, string replyTo) { -#if 1 - (void)body; - (void)sequence; - (void)replyTo; -#else string methodName; - string packageName; - string className; - uint8_t hash[16]; - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; qpid::messaging::Message inMsg(body); qpid::messaging::MapView inMap(inMsg); - qpid::messaging::MapView::const_iterator i; + qpid::messaging::MapView::const_iterator oid, mid; - if ((i = inMap.find("_object_id")) == _map.end()) { - // KAG TODO: TBD!! - } - //ObjectId objId(inBuffer); - ObjectId objId(std::string("foobag?")); + qpid::messaging::Message outMsg; + qpid::messaging::MapContent outMap(outMsg); - inBuffer.getShortString(packageName); - inBuffer.getShortString(className); - inBuffer.getBin128(hash); - inBuffer.getShortString(methodName); + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) { + ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID; + ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID); + } else { + std::string methodName; + ObjectId objId; + qpid::messaging::Variant::Map inArgs; - encodeHeader(outBuffer, 'm', sequence); + try { + // coversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); - ManagementObjectMap::iterator iter = managementObjects.find(objId); - if (iter == managementObjects.end() || iter->second->isDeleted()) { - outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); - outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT)); - } else { - if ((iter->second->getPackageName() != packageName) || - (iter->second->getClassName() != className)) { - outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); - outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); - } - else - try { - outBuffer.record(); - //iter->second->doMethod(methodName, inBuffer, outBuffer); - assert(false); // TODO: fix above - } catch(exception& e) { - outBuffer.restore(); - outBuffer.putLong(Manageable::STATUS_EXCEPTION); - outBuffer.putMediumString(e.what()); + mid = inMap.find("_arguments"); + if (mid != inMap.end()) { + inArgs = (mid->second).asMap(); } + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { + ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT; + ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT); + } else { + + iter->second->doMethod(methodName, inArgs, outMap.asMap()); + } + + } catch(exception& e) { + ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; + ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what(); + } } - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); -#endif + qpid::messaging::Variant::Map headers; + headers["method"] = "response"; + headers["qmf.opcode"] = "_method_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + outMap.encode(); + connThreadBody.sendBuffer(outMsg.getContent(), sequence, headers, "qmf.default.direct", replyTo); } void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo) diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 946a1dd2bc..34268c662d 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -439,9 +439,7 @@ void ManagementAgent::sendBuffer(const std::string& data, msg->getFrames().getHeaders()->get<MessageProperties>(true); props->setContentLength(data.length()); if (sequence) { - std::stringstream seqstr; - seqstr << sequence; - props->setCorrelationId(seqstr.str()); + props->setCorrelationId(boost::lexical_cast<std::string>(sequence)); } for (i = headers.begin(); i != headers.end(); ++i) { @@ -754,6 +752,13 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) { + QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!"); +#if 1 + (void)inBuffer; + (void)replyToKey; + (void)sequence; + (void)connToken; +#else // @todo KAG use new method req format string methodName; string packageName; @@ -831,8 +836,111 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence); +#endif } + +void ManagementAgent::handleMethodRequestLH (const std::string& body, string replyTo, + uint32_t sequence, const ConnectionToken* connToken) +{ + string methodName; + qpid::messaging::Message inMsg(body); + qpid::messaging::MapView inMap(inMsg); + qpid::messaging::MapView::const_iterator oid, mid; + + qpid::messaging::Message outMsg; + qpid::messaging::MapContent outMap(outMsg); + qpid::messaging::Variant::Map headers; + + headers["method"] = "response"; + headers["qmf.opcode"] = "_method_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); + + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) + { + ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID; + ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID); + outMap.encode(); + sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo); + return; + } + + ObjectId objId; + qpid::messaging::Variant::Map inArgs; + + try { + // coversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); + + mid = inMap.find("_arguments"); + if (mid != inMap.end()) { + inArgs = (mid->second).asMap(); + } + } catch(exception& e) { + ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; + ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what(); + outMap.encode(); + sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo); + return; + } + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + + if (iter == managementObjects.end() || iter->second->isDeleted()) { + ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT; + ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT); + outMap.encode(); + sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo); + return; + } + + // validate + AclModule* acl = broker->getAcl(); + DisallowedMethods::const_iterator i; + + i = disallowed.find(std::make_pair(iter->second->getClassName(), methodName)); + if (i != disallowed.end()) { + ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; + ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = i->second; + outMap.encode(); + sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo); + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); + return; + } + + if (acl != 0) { + string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); + map<acl::Property, string> params; + params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName(); + params[acl::PROP_SCHEMACLASS] = iter->second->getClassName(); + + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { + ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; + ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN); + outMap.encode(); + sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo); + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + return; + } + } + + // invoke the method + + try { + iter->second->doMethod(methodName, inArgs, outMap.asMap()); + } catch(exception& e) { + ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; + ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what(); + } + + outMap.encode(); + sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo); +} + + void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -1326,8 +1434,6 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) void ManagementAgent::dispatchAgentCommandLH(Message& msg) { - Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode; uint32_t sequence; string replyToKey; @@ -1340,6 +1446,9 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) else return; + Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + if (msg.encodedSize() > MA_BUFFER_SIZE) { QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << msg.encodedSize()); @@ -1350,7 +1459,39 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); - // KAG TODO: need to handle map style method requests + const framing::FieldTable *headers = msg.getApplicationHeaders(); + + if (headers && headers->getAsString("app_id") == "qmf2") + { + std::string opcode = headers->getAsString("qmf.opcode"); + + sequence = 0; + if (p && p->hasCorrelationId()) { + std::string cid = p->getCorrelationId(); + if (!cid.empty()) { + try { + sequence = boost::lexical_cast<uint32_t>(cid); + } catch(const boost::bad_lexical_cast&) { + QPID_LOG(warning, "Bad correlation Id for received QMF request."); + return; + } + } + } + + if (opcode == "_method_request") { + std::string body; + inBuffer.getRawData(body, bufferLen); + handleMethodRequestLH(body, replyToKey, sequence, msg.getPublisher()); + return; + } + + QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); + return; + } + + // old preV2 binary messages + + while (inBuffer.getPosition() < bufferLen) { if (!checkHeader(inBuffer, &opcode, &sequence)) return; diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 8c5ee4475f..e74d8b419a 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -310,6 +310,7 @@ private: void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleMethodRequestLH (const std::string& body, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); size_t validateSchema(framing::Buffer&, uint8_t kind); size_t validateTableSchema(framing::Buffer&); |