summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-03-18 21:14:54 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-03-18 21:14:54 +0000
commit3f216a28c0d65f703b5ca681689db8a3c08cebaf (patch)
tree2233987a28e7d1512f8074ead977b27493501894
parentee12cdd0aa3e3e1b85e01bc31d51c47b35ef332f (diff)
downloadqpid-python-3f216a28c0d65f703b5ca681689db8a3c08cebaf.tar.gz
agent side method request handling
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@924995 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp84
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp153
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h1
3 files changed, 188 insertions, 50 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index a99f24c2f1..547d71e165 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -472,60 +472,56 @@ void ManagementAgentImpl::handleConsoleAddedIndication()
void ManagementAgentImpl::invokeMethodRequest(const std::string& body, uint32_t sequence, string replyTo)
{
-#if 1
- (void)body;
- (void)sequence;
- (void)replyTo;
-#else
string methodName;
- string packageName;
- string className;
- uint8_t hash[16];
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
qpid::messaging::Message inMsg(body);
qpid::messaging::MapView inMap(inMsg);
- qpid::messaging::MapView::const_iterator i;
+ qpid::messaging::MapView::const_iterator oid, mid;
- if ((i = inMap.find("_object_id")) == _map.end()) {
- // KAG TODO: TBD!!
- }
- //ObjectId objId(inBuffer);
- ObjectId objId(std::string("foobag?"));
+ qpid::messaging::Message outMsg;
+ qpid::messaging::MapContent outMap(outMsg);
- inBuffer.getShortString(packageName);
- inBuffer.getShortString(className);
- inBuffer.getBin128(hash);
- inBuffer.getShortString(methodName);
+ if ((oid = inMap.find("_object_id")) == inMap.end() ||
+ (mid = inMap.find("_method_name")) == inMap.end()) {
+ ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID;
+ ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
+ } else {
+ std::string methodName;
+ ObjectId objId;
+ qpid::messaging::Variant::Map inArgs;
- encodeHeader(outBuffer, 'm', sequence);
+ try {
+ // coversions will throw if input is invalid.
+ objId = ObjectId(oid->second.asMap());
+ methodName = mid->second.getString();
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
- if (iter == managementObjects.end() || iter->second->isDeleted()) {
- outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
- outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT));
- } else {
- if ((iter->second->getPackageName() != packageName) ||
- (iter->second->getClassName() != className)) {
- outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID);
- outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
- }
- else
- try {
- outBuffer.record();
- //iter->second->doMethod(methodName, inBuffer, outBuffer);
- assert(false); // TODO: fix above
- } catch(exception& e) {
- outBuffer.restore();
- outBuffer.putLong(Manageable::STATUS_EXCEPTION);
- outBuffer.putMediumString(e.what());
+ mid = inMap.find("_arguments");
+ if (mid != inMap.end()) {
+ inArgs = (mid->second).asMap();
}
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT;
+ ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
+ } else {
+
+ iter->second->doMethod(methodName, inArgs, outMap.asMap());
+ }
+
+ } catch(exception& e) {
+ ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+ ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what();
+ }
}
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
-#endif
+ qpid::messaging::Variant::Map headers;
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_method_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ outMap.encode();
+ connThreadBody.sendBuffer(outMsg.getContent(), sequence, headers, "qmf.default.direct", replyTo);
}
void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo)
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 946a1dd2bc..34268c662d 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -439,9 +439,7 @@ void ManagementAgent::sendBuffer(const std::string& data,
msg->getFrames().getHeaders()->get<MessageProperties>(true);
props->setContentLength(data.length());
if (sequence) {
- std::stringstream seqstr;
- seqstr << sequence;
- props->setCorrelationId(seqstr.str());
+ props->setCorrelationId(boost::lexical_cast<std::string>(sequence));
}
for (i = headers.begin(); i != headers.end(); ++i) {
@@ -754,6 +752,13 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
uint32_t sequence, const ConnectionToken* connToken)
{
+ QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!");
+#if 1
+ (void)inBuffer;
+ (void)replyToKey;
+ (void)sequence;
+ (void)connToken;
+#else
// @todo KAG use new method req format
string methodName;
string packageName;
@@ -831,8 +836,111 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence);
+#endif
}
+
+void ManagementAgent::handleMethodRequestLH (const std::string& body, string replyTo,
+ uint32_t sequence, const ConnectionToken* connToken)
+{
+ string methodName;
+ qpid::messaging::Message inMsg(body);
+ qpid::messaging::MapView inMap(inMsg);
+ qpid::messaging::MapView::const_iterator oid, mid;
+
+ qpid::messaging::Message outMsg;
+ qpid::messaging::MapContent outMap(outMsg);
+ qpid::messaging::Variant::Map headers;
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_method_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
+
+ if ((oid = inMap.find("_object_id")) == inMap.end() ||
+ (mid = inMap.find("_method_name")) == inMap.end())
+ {
+ ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID;
+ ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
+ outMap.encode();
+ sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+ return;
+ }
+
+ ObjectId objId;
+ qpid::messaging::Variant::Map inArgs;
+
+ try {
+ // coversions will throw if input is invalid.
+ objId = ObjectId(oid->second.asMap());
+ methodName = mid->second.getString();
+
+ mid = inMap.find("_arguments");
+ if (mid != inMap.end()) {
+ inArgs = (mid->second).asMap();
+ }
+ } catch(exception& e) {
+ ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+ ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what();
+ outMap.encode();
+ sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+ return;
+ }
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT;
+ ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
+ outMap.encode();
+ sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+ return;
+ }
+
+ // validate
+ AclModule* acl = broker->getAcl();
+ DisallowedMethods::const_iterator i;
+
+ i = disallowed.find(std::make_pair(iter->second->getClassName(), methodName));
+ if (i != disallowed.end()) {
+ ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+ ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = i->second;
+ outMap.encode();
+ sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
+ return;
+ }
+
+ if (acl != 0) {
+ string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
+ map<acl::Property, string> params;
+ params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName();
+ params[acl::PROP_SCHEMACLASS] = iter->second->getClassName();
+
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
+ ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+ ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
+ outMap.encode();
+ sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ return;
+ }
+ }
+
+ // invoke the method
+
+ try {
+ iter->second->doMethod(methodName, inArgs, outMap.asMap());
+ } catch(exception& e) {
+ ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+ ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what();
+ }
+
+ outMap.encode();
+ sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+}
+
+
void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -1326,8 +1434,6 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
void ManagementAgent::dispatchAgentCommandLH(Message& msg)
{
- Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
- uint8_t opcode;
uint32_t sequence;
string replyToKey;
@@ -1340,6 +1446,9 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg)
else
return;
+ Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
+ uint8_t opcode;
+
if (msg.encodedSize() > MA_BUFFER_SIZE) {
QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
msg.encodedSize());
@@ -1350,7 +1459,39 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg)
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
- // KAG TODO: need to handle map style method requests
+ const framing::FieldTable *headers = msg.getApplicationHeaders();
+
+ if (headers && headers->getAsString("app_id") == "qmf2")
+ {
+ std::string opcode = headers->getAsString("qmf.opcode");
+
+ sequence = 0;
+ if (p && p->hasCorrelationId()) {
+ std::string cid = p->getCorrelationId();
+ if (!cid.empty()) {
+ try {
+ sequence = boost::lexical_cast<uint32_t>(cid);
+ } catch(const boost::bad_lexical_cast&) {
+ QPID_LOG(warning, "Bad correlation Id for received QMF request.");
+ return;
+ }
+ }
+ }
+
+ if (opcode == "_method_request") {
+ std::string body;
+ inBuffer.getRawData(body, bufferLen);
+ handleMethodRequestLH(body, replyToKey, sequence, msg.getPublisher());
+ return;
+ }
+
+ QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
+ return;
+ }
+
+ // old preV2 binary messages
+
+
while (inBuffer.getPosition() < bufferLen) {
if (!checkHeader(inBuffer, &opcode, &sequence))
return;
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index 8c5ee4475f..e74d8b419a 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -310,6 +310,7 @@ private:
void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+ void handleMethodRequestLH (const std::string& body, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
size_t validateSchema(framing::Buffer&, uint8_t kind);
size_t validateTableSchema(framing::Buffer&);