summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-03-19 14:59:47 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-03-19 14:59:47 +0000
commit0b7c046413aacb145573725be13b15dfc4dd9727 (patch)
tree510c13b12ac475f84e0548db97aaa2b97805f489
parent3f216a28c0d65f703b5ca681689db8a3c08cebaf (diff)
downloadqpid-python-0b7c046413aacb145573725be13b15dfc4dd9727.tar.gz
authorize agent method call
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@925252 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp195
1 files changed, 150 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 34268c662d..873e509c36 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -54,12 +54,14 @@ namespace _qmf = qmf::org::apache::qpid::broker;
static qpid::messaging::Variant::Map mapEncodeSchemaId(const std::string& pname,
const std::string& cname,
+ const std::string& type,
const uint8_t *md5Sum)
{
qpid::messaging::Variant::Map map_;
map_["_package_name"] = pname;
map_["_class_name"] = cname;
+ map_["_type"] = type;
map_["_hash_str"] = std::string((const char *)md5Sum,
qpid::management::ManagementObject::MD5_LEN);
return map_;
@@ -251,6 +253,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,
map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
object->getClassName(),
+ "_data",
object->getMd5Sum());
object->mapEncodeValues(values, true, false); // send props only
map_["_values"] = values;
@@ -285,6 +288,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
event.getEventName(),
+ "_event",
event.getMd5Sum());
event.mapEncode(values);
map_["_values"] = values;
@@ -300,6 +304,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
sendBuffer(msg.getContent(), 0, headers, mExchange,
"console.event.1.0." + event.getPackageName() + "." + event.getEventName());
+ QPID_LOG(trace, "SEND raiseEvent class=" << event.getPackageName() << "." << event.getEventName());
}
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
@@ -562,6 +567,7 @@ void ManagementAgent::periodicProcessing (void)
map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
object->getClassName(),
+ "_data",
object->getMd5Sum());
object->mapEncodeValues(values, send_props, send_stats);
map_["_values"] = values;
@@ -617,6 +623,7 @@ void ManagementAgent::periodicProcessing (void)
map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(),
(*cdIter)->getClassName(),
+ "_data",
(*cdIter)->getMd5Sum());
(*cdIter)->mapEncodeValues(values, true, false);
map_["_values"] = values;
@@ -675,6 +682,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
object->getClassName(),
+ "_data",
object->getMd5Sum());
object->mapEncodeValues(values, true, false);
map_["_values"] = values;
@@ -752,14 +760,14 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
uint32_t sequence, const ConnectionToken* connToken)
{
+#if 1 // deprecated
QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!");
-#if 1
+ // prevent "unused param" warnings
(void)inBuffer;
(void)replyToKey;
(void)sequence;
(void)connToken;
#else
- // @todo KAG use new method req format
string methodName;
string packageName;
string className;
@@ -768,9 +776,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
uint32_t outLen;
AclModule* acl = broker->getAcl();
- //ObjectId objId(inBuffer);
- assert(false); // KAG TODO FIXME
- ObjectId objId(std::string("fleabag???"));
+ ObjectId objId(inBuffer);
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
@@ -823,8 +829,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
try {
outBuffer.record();
Mutex::ScopedUnlock u(userLock);
- //iter->second->doMethod(methodName, inBuffer, outBuffer);
- assert(false); // KAG TODO FIX
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
} catch(exception& e) {
outBuffer.restore();
outBuffer.putLong(Manageable::STATUS_EXCEPTION);
@@ -864,6 +869,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
outMap.encode();
sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << sequence);
return;
}
@@ -884,6 +890,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what();
outMap.encode();
sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << sequence);
return;
}
@@ -894,6 +901,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
outMap.encode();
sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << sequence);
return;
}
@@ -934,10 +942,15 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
} catch(exception& e) {
((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what();
+ outMap.encode();
+ sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << sequence);
+ return;
}
outMap.encode();
sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse to=" << replyTo << " seq=" << sequence);
}
@@ -1077,7 +1090,6 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf)
void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
- // @todo KAG: use new schema format
string packageName;
SchemaClassKey key;
@@ -1119,7 +1131,6 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToK
string packageName;
SchemaClassKey key;
- // KAG: TODO - Handle new schema format
inBuffer.record();
inBuffer.getOctet();
inBuffer.getShortString(packageName);
@@ -1362,69 +1373,163 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
{
- // KAG TODO: handle both old and new clients
-
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
- uint8_t opcode;
- uint32_t sequence;
- string replyToKey;
+ uint32_t sequence = 0;
+ bool methodReq = false;
+ bool mapMsg = false;
+ string packageName;
+ string className;
+ string methodName;
if (msg.encodedSize() > MA_BUFFER_SIZE)
return false;
msg.encodeContent(inBuffer);
+ uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
- if (!checkHeader(inBuffer, &opcode, &sequence))
- // KAG TODO: handle new map style messages also!
- //return false;
- assert(false);
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+
+ const framing::FieldTable *headers = msg.getApplicationHeaders();
+
+ if (headers && headers->getAsString("app_id") == "qmf2")
+ {
+ mapMsg = true;
+
+ if (p && p->hasCorrelationId()) {
+ std::string cid = p->getCorrelationId();
+ if (!cid.empty()) {
+ try {
+ sequence = boost::lexical_cast<uint32_t>(cid);
+ } catch(const boost::bad_lexical_cast&) {
+ QPID_LOG(warning,
+ "Bad correlation Id for received QMF authorize req: [" << cid << "]");
+ return false;
+ }
+ }
+ }
+
+ if (headers->getAsString("qmf.opcode") == "_method_request")
+ {
+ methodReq = true;
+
+ // extract object id and method name
+
+ std::string body;
+ inBuffer.getRawData(body, bufferLen);
+ qpid::messaging::Message inMsg(body);
+ qpid::messaging::MapView inMap(inMsg);
+ qpid::messaging::MapView::const_iterator oid, mid;
+
+ ObjectId objId;
+
+ if ((oid = inMap.find("_object_id")) == inMap.end() ||
+ (mid = inMap.find("_method_name")) == inMap.end()) {
+ QPID_LOG(warning,
+ "Missing fields in QMF authorize req received.");
+ return false;
+ }
+
+ try {
+ // coversions will throw if input is invalid.
+ objId = ObjectId(oid->second.asMap());
+ methodName = mid->second.getString();
+ } catch(exception& e) {
+ QPID_LOG(warning,
+ "Badly formatted QMF authorize req received.");
+ return false;
+ }
+
+ // look up schema for object to get package and class name
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " <<
+ objId);
+ return false;
+ }
+
+ packageName = iter->second->getPackageName();
+ className = iter->second->getClassName();
+ }
+ } else { // old style binary message format
+
+ uint8_t opcode;
+
+ if (!checkHeader(inBuffer, &opcode, &sequence))
+ return false;
+
+ if (opcode == 'M') {
+ methodReq = true;
+
+ // extract method name & schema package and class name
- if (opcode == 'M') {
+ uint8_t hash[16];
+ inBuffer.getLongLong(); // skip over object id
+ inBuffer.getLongLong();
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
+ inBuffer.getShortString(methodName);
+
+ }
+ }
+
+ if (methodReq) {
// TODO: check method call against ACL list.
+ map<acl::Property, string> params;
AclModule* acl = broker->getAcl();
if (acl == 0)
return true;
string userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId();
- string packageName;
- string className;
- uint8_t hash[16];
- string methodName;
-
- map<acl::Property, string> params;
- //ObjectId objId(inBuffer);
- inBuffer.getLongLong();
- inBuffer.getLongLong();
- inBuffer.getShortString(packageName);
- inBuffer.getShortString(className);
- inBuffer.getBin128(hash);
- inBuffer.getShortString(methodName);
-
params[acl::PROP_SCHEMAPACKAGE] = packageName;
params[acl::PROP_SCHEMACLASS] = className;
if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params))
return true;
+ // authorization failed, send reply if replyTo present
+
const framing::MessageProperties* p =
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
- replyToKey = rt.getRoutingKey();
+ string replyToKey = rt.getRoutingKey();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ if (mapMsg) {
- // KAG TODO: old-style response
- encodeHeader(outBuffer, 'm', sequence);
- outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
- outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
- }
+ qpid::messaging::Message outMsg;
+ qpid::messaging::MapContent outMap(outMsg);
+ qpid::messaging::Variant::Map headers;
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_method_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
+
+ ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+ ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
+ outMap.encode();
+ sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyToKey);
+
+ } else {
+
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ encodeHeader(outBuffer, 'm', sequence);
+ outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+ outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ }
+
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ }
return false;
}