summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavel Moravec <pmoravec@apache.org>2014-06-18 07:40:22 +0000
committerPavel Moravec <pmoravec@apache.org>2014-06-18 07:40:22 +0000
commit9c07d2fd2d1808f0a3b8ee35c86cb65e58444ae8 (patch)
treedbe13b2003d1be4f46463cd96dd8ab71d57668b7
parentf9c83b18c822fbebe8417818b1025503c2238b86 (diff)
downloadqpid-python-9c07d2fd2d1808f0a3b8ee35c86cb65e58444ae8.tar.gz
QPID-5817: [C++ broker] Improve ACL authorisation of QMF methods and queries
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1603364 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/AclModule.h93
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp32
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h5
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp15
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp44
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h4
-rwxr-xr-xqpid/cpp/src/tests/acl.py163
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py1
-rw-r--r--qpid/doc/book/src/cpp-broker/Security.xml79
9 files changed, 388 insertions, 48 deletions
diff --git a/qpid/cpp/src/qpid/broker/AclModule.h b/qpid/cpp/src/qpid/broker/AclModule.h
index f97c932e27..67c859c5c4 100644
--- a/qpid/cpp/src/qpid/broker/AclModule.h
+++ b/qpid/cpp/src/qpid/broker/AclModule.h
@@ -48,6 +48,7 @@ namespace acl {
OBJ_BROKER,
OBJ_LINK,
OBJ_METHOD,
+ OBJ_QUERY,
OBJECTSIZE }; // OBJECTSIZE must be last in list
// Action shared between ACL spec and ACL authorise interface
@@ -61,6 +62,9 @@ namespace acl {
ACT_DELETE,
ACT_PURGE,
ACT_UPDATE,
+ ACT_MOVE,
+ ACT_REDIRECT,
+ ACT_REROUTE,
ACTIONSIZE }; // ACTIONSIZE must be last in list
// Property used in ACL authorize interface
@@ -74,6 +78,7 @@ namespace acl {
PROP_TYPE,
PROP_ALTERNATE,
PROP_QUEUENAME,
+ PROP_EXCHANGENAME,
PROP_SCHEMAPACKAGE,
PROP_SCHEMACLASS,
PROP_POLICYTYPE,
@@ -100,6 +105,7 @@ namespace acl {
SPECPROP_TYPE = PROP_TYPE,
SPECPROP_ALTERNATE = PROP_ALTERNATE,
SPECPROP_QUEUENAME = PROP_QUEUENAME,
+ SPECPROP_EXCHANGENAME = PROP_EXCHANGENAME,
SPECPROP_SCHEMAPACKAGE = PROP_SCHEMAPACKAGE,
SPECPROP_SCHEMACLASS = PROP_SCHEMACLASS,
SPECPROP_POLICYTYPE = PROP_POLICYTYPE,
@@ -186,6 +192,7 @@ namespace acl {
if (str.compare("broker") == 0) return OBJ_BROKER;
if (str.compare("link") == 0) return OBJ_LINK;
if (str.compare("method") == 0) return OBJ_METHOD;
+ if (str.compare("query") == 0) return OBJ_QUERY;
throw qpid::Exception(str);
}
static inline std::string getObjectTypeStr(const ObjectType o) {
@@ -195,33 +202,40 @@ namespace acl {
case OBJ_BROKER: return "broker";
case OBJ_LINK: return "link";
case OBJ_METHOD: return "method";
+ case OBJ_QUERY: return "query";
default: assert(false); // should never get here
}
return "";
}
static inline Action getAction(const std::string& str) {
- if (str.compare("consume") == 0) return ACT_CONSUME;
- if (str.compare("publish") == 0) return ACT_PUBLISH;
- if (str.compare("create") == 0) return ACT_CREATE;
- if (str.compare("access") == 0) return ACT_ACCESS;
- if (str.compare("bind") == 0) return ACT_BIND;
- if (str.compare("unbind") == 0) return ACT_UNBIND;
- if (str.compare("delete") == 0) return ACT_DELETE;
- if (str.compare("purge") == 0) return ACT_PURGE;
- if (str.compare("update") == 0) return ACT_UPDATE;
+ if (str.compare("consume") == 0) return ACT_CONSUME;
+ if (str.compare("publish") == 0) return ACT_PUBLISH;
+ if (str.compare("create") == 0) return ACT_CREATE;
+ if (str.compare("access") == 0) return ACT_ACCESS;
+ if (str.compare("bind") == 0) return ACT_BIND;
+ if (str.compare("unbind") == 0) return ACT_UNBIND;
+ if (str.compare("delete") == 0) return ACT_DELETE;
+ if (str.compare("purge") == 0) return ACT_PURGE;
+ if (str.compare("update") == 0) return ACT_UPDATE;
+ if (str.compare("move") == 0) return ACT_MOVE;
+ if (str.compare("redirect") == 0) return ACT_REDIRECT;
+ if (str.compare("reroute") == 0) return ACT_REROUTE;
throw qpid::Exception(str);
}
static inline std::string getActionStr(const Action a) {
switch (a) {
- case ACT_CONSUME: return "consume";
- case ACT_PUBLISH: return "publish";
- case ACT_CREATE: return "create";
- case ACT_ACCESS: return "access";
- case ACT_BIND: return "bind";
- case ACT_UNBIND: return "unbind";
- case ACT_DELETE: return "delete";
- case ACT_PURGE: return "purge";
- case ACT_UPDATE: return "update";
+ case ACT_CONSUME: return "consume";
+ case ACT_PUBLISH: return "publish";
+ case ACT_CREATE: return "create";
+ case ACT_ACCESS: return "access";
+ case ACT_BIND: return "bind";
+ case ACT_UNBIND: return "unbind";
+ case ACT_DELETE: return "delete";
+ case ACT_PURGE: return "purge";
+ case ACT_UPDATE: return "update";
+ case ACT_MOVE: return "move";
+ case ACT_REDIRECT: return "redirect";
+ case ACT_REROUTE: return "reroute";
default: assert(false); // should never get here
}
return "";
@@ -236,6 +250,7 @@ namespace acl {
if (str.compare("type") == 0) return PROP_TYPE;
if (str.compare("alternate") == 0) return PROP_ALTERNATE;
if (str.compare("queuename") == 0) return PROP_QUEUENAME;
+ if (str.compare("exchangename") == 0) return PROP_EXCHANGENAME;
if (str.compare("schemapackage") == 0) return PROP_SCHEMAPACKAGE;
if (str.compare("schemaclass") == 0) return PROP_SCHEMACLASS;
if (str.compare("policytype") == 0) return PROP_POLICYTYPE;
@@ -259,6 +274,7 @@ namespace acl {
case PROP_TYPE: return "type";
case PROP_ALTERNATE: return "alternate";
case PROP_QUEUENAME: return "queuename";
+ case PROP_EXCHANGENAME: return "exchangename";
case PROP_SCHEMAPACKAGE: return "schemapackage";
case PROP_SCHEMACLASS: return "schemaclass";
case PROP_POLICYTYPE: return "policytype";
@@ -283,6 +299,7 @@ namespace acl {
if (str.compare("type") == 0) return SPECPROP_TYPE;
if (str.compare("alternate") == 0) return SPECPROP_ALTERNATE;
if (str.compare("queuename") == 0) return SPECPROP_QUEUENAME;
+ if (str.compare("exchangename") == 0) return SPECPROP_EXCHANGENAME;
if (str.compare("schemapackage") == 0) return SPECPROP_SCHEMAPACKAGE;
if (str.compare("schemaclass") == 0) return SPECPROP_SCHEMACLASS;
if (str.compare("policytype") == 0) return SPECPROP_POLICYTYPE;
@@ -315,6 +332,7 @@ namespace acl {
case SPECPROP_TYPE: return "type";
case SPECPROP_ALTERNATE: return "alternate";
case SPECPROP_QUEUENAME: return "queuename";
+ case SPECPROP_EXCHANGENAME: return "exchangename";
case SPECPROP_SCHEMAPACKAGE: return "schemapackage";
case SPECPROP_SCHEMACLASS: return "schemaclass";
case SPECPROP_POLICYTYPE: return "policytype";
@@ -413,12 +431,22 @@ namespace acl {
p4->insert(PROP_MAXQUEUESIZE);
p4->insert(PROP_MAXQUEUECOUNT);
+ propSetPtr p5(new propSet);
+ p5->insert(PROP_QUEUENAME);
+
+ propSetPtr p6(new propSet);
+ p6->insert(PROP_EXCHANGENAME);
+
+
actionMapPtr a1(new actionMap);
- a1->insert(actionPair(ACT_ACCESS, p0));
- a1->insert(actionPair(ACT_CREATE, p4));
- a1->insert(actionPair(ACT_PURGE, p0));
- a1->insert(actionPair(ACT_DELETE, p0));
- a1->insert(actionPair(ACT_CONSUME, p0));
+ a1->insert(actionPair(ACT_ACCESS, p0));
+ a1->insert(actionPair(ACT_CREATE, p4));
+ a1->insert(actionPair(ACT_PURGE, p0));
+ a1->insert(actionPair(ACT_DELETE, p0));
+ a1->insert(actionPair(ACT_CONSUME, p0));
+ a1->insert(actionPair(ACT_MOVE, p5));
+ a1->insert(actionPair(ACT_REDIRECT, p5));
+ a1->insert(actionPair(ACT_REROUTE, p6));
map->insert(objectPair(OBJ_QUEUE, a1));
@@ -431,14 +459,25 @@ namespace acl {
// == Method ==
- propSetPtr p5(new propSet);
- p5->insert(PROP_SCHEMAPACKAGE);
- p5->insert(PROP_SCHEMACLASS);
+ propSetPtr p7(new propSet);
+ p7->insert(PROP_SCHEMAPACKAGE);
+ p7->insert(PROP_SCHEMACLASS);
actionMapPtr a4(new actionMap);
- a4->insert(actionPair(ACT_ACCESS, p5));
+ a4->insert(actionPair(ACT_ACCESS, p7));
map->insert(objectPair(OBJ_METHOD, a4));
+
+ // == Query ==
+
+ propSetPtr p8(new propSet);
+ p8->insert(PROP_SCHEMACLASS);
+
+ actionMapPtr a5(new actionMap);
+ a5->insert(actionPair(ACT_ACCESS, p8));
+
+ map->insert(objectPair(OBJ_QUERY, a5));
+
}
//
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 348701ac4b..7bb93fc1c4 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -541,7 +541,8 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
_qmf::ArgsBrokerQueueMoveMessages& moveArgs=
dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
QPID_LOG (debug, "Broker::queueMoveMessages()");
- if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter) >= 0)
+ if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty,
+ moveArgs.i_filter, getCurrentPublisher()) >=0)
status = Manageable::STATUS_OK;
else
return Manageable::STATUS_PARAMETER_INVALID;
@@ -609,7 +610,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
string srcQueue(dynamic_cast<_qmf::ArgsBrokerQueueRedirect&>(args).i_sourceQueue);
string tgtQueue(dynamic_cast<_qmf::ArgsBrokerQueueRedirect&>(args).i_targetQueue);
QPID_LOG (debug, "Broker::queueRedirect source queue:" << srcQueue << " to target queue " << tgtQueue);
- status = queueRedirect(srcQueue, tgtQueue);
+ status = queueRedirect(srcQueue, tgtQueue, getCurrentPublisher());
break;
}
default:
@@ -1085,7 +1086,8 @@ bool Broker::getLogHiresTimestamp()
Manageable::status_t Broker::queueRedirect(const std::string& srcQueue,
- const std::string& tgtQueue)
+ const std::string& tgtQueue,
+ const Connection* context)
{
Queue::shared_ptr srcQ(queues.find(srcQueue));
if (!srcQ) {
@@ -1133,6 +1135,13 @@ Manageable::status_t Broker::queueRedirect(const std::string& srcQueue,
return Manageable::STATUS_USER;
}
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, tgtQ->getName()));
+ if (!acl->authorise((context)?context->getUserId():"", acl::ACT_REDIRECT, acl::OBJ_QUEUE, srcQ->getName(), &params))
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied redirect request from " << ((context)?context->getUserId():"(uknown)")));
+ }
+
// Start the backup overflow partnership
srcQ->setRedirectPeer(tgtQ, true);
tgtQ->setRedirectPeer(srcQ, false);
@@ -1164,6 +1173,13 @@ Manageable::status_t Broker::queueRedirect(const std::string& srcQueue,
return Manageable::STATUS_USER;
}
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, tgtQ->getName()));
+ if (!acl->authorise((context)?context->getUserId():"", acl::ACT_REDIRECT, acl::OBJ_QUEUE, srcQ->getName(), &params))
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied redirect request from " << ((context)?context->getUserId():"(uknown)")));
+ }
+
queueRedirectDestroy(srcQ, tgtQ, true);
return Manageable::STATUS_OK;
@@ -1261,7 +1277,8 @@ int32_t Broker::queueMoveMessages(
const std::string& srcQueue,
const std::string& destQueue,
uint32_t qty,
- const Variant::Map& filter)
+ const Variant::Map& filter,
+ const Connection* context)
{
Queue::shared_ptr src_queue = queues.find(srcQueue);
if (!src_queue)
@@ -1270,6 +1287,13 @@ int32_t Broker::queueMoveMessages(
if (!dest_queue)
return -1;
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, dest_queue->getName()));
+ if (!acl->authorise((context)?context->getUserId():"", acl::ACT_MOVE, acl::OBJ_QUEUE, src_queue->getName(), &params))
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied move request from " << ((context)?context->getUserId():"(uknown)")));
+ }
+
return (int32_t) src_queue->move(dest_queue, qty, &filter);
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 5583d6cf29..bd18d2f5ec 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -169,7 +169,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
const Connection* context);
Manageable::status_t setTimestampConfig(const bool receive,
const Connection* context);
- Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string& tgtQueue);
+ Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string& tgtQueue, const Connection* context);
void queueRedirectDestroy(boost::shared_ptr<Queue> srcQ, boost::shared_ptr<Queue> tgtQ, bool moveMsgs);
boost::shared_ptr<sys::Poller> poller;
std::auto_ptr<sys::Timer> timer;
@@ -291,7 +291,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
const std::string& srcQueue,
const std::string& destQueue,
uint32_t qty,
- const qpid::types::Variant::Map& filter);
+ const qpid::types::Variant::Map& filter,
+ const Connection* context);
QPID_BROKER_EXTERN const TransportInfo& getTransportInfo(
const std::string& name = TCP_TRANSPORT) const;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index dcd62e42aa..07dbfccb95 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/AclModule.h"
#include "qpid/broker/QueueCursor.h"
#include "qpid/broker/QueueDepth.h"
@@ -73,6 +74,7 @@ using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
+using qpid::management::getCurrentPublisher;
using std::string;
using std::for_each;
using std::mem_fun;
@@ -1412,12 +1414,17 @@ ManagementObject::shared_ptr Queue::GetManagementObject(void) const
Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+ AclModule* acl = broker->getAcl();
+ std::string _userId = (getCurrentPublisher()?getCurrentPublisher()->getUserId():"");
QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
switch (methodId) {
case _qmf::Queue::METHOD_PURGE :
{
+ if ((acl)&&(!(acl->authorise(_userId, acl::ACT_PURGE, acl::OBJ_QUEUE, name, NULL)))) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied purge request from " << _userId));
+ }
_qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args;
purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter);
status = Manageable::STATUS_OK;
@@ -1445,6 +1452,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
}
}
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_EXCHANGENAME, dest->getName()));
+ if (!acl->authorise(_userId, acl::ACT_REROUTE, acl::OBJ_QUEUE, name, &params)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied reroute request from " << _userId));
+ }
+ }
+
purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter);
status = Manageable::STATUS_OK;
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 968caba760..0586171dc4 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -1791,10 +1791,11 @@ void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& reply
" to=" << replyToKey << " seq=" << sequence);
}
-void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const string& userId)
{
FieldTable ft;
FieldTable::ValuePtr value;
+ AclModule* acl = broker->getAcl();
moveNewObjects();
@@ -1820,6 +1821,14 @@ void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey,
if (object) {
ResizableBuffer outBuffer (qmfV1BufferSize);
+ if (acl != 0) {
+ map<acl::Property, string> params;
+ params[acl::PROP_SCHEMACLASS] = object->getClassName();
+
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, object->getObjectId().getV2Key(), &params)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object " << object->getObjectId().getV2Key() << " from " << userId));
+ }
+ }
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
@@ -1843,6 +1852,15 @@ void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey,
string className (value->get<string>());
std::list<ManagementObject::shared_ptr> matches;
+ if (acl != 0) {
+ map<acl::Property, string> params;
+ params[acl::PROP_SCHEMACLASS] = className;
+
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, className /* class-wide query */, &params)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object class " << className << " from " << userId));
+ }
+ }
+
if (className == "memory")
qpid::sys::MemStat::loadMemInfo(memstat.get());
@@ -1903,13 +1921,14 @@ void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey,
}
-void ManagementAgent::handleGetQuery(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal)
+void ManagementAgent::handleGetQuery(const string& body, const string& rte, const string& rtk, const string& cid, const std::string& userId, bool viaLocal)
{
moveNewObjects();
Variant::Map inMap;
Variant::Map::const_iterator i;
Variant::Map headers;
+ AclModule* acl = broker->getAcl();
MapCodec::decode(body, inMap);
QPID_LOG(debug, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
@@ -1982,6 +2001,14 @@ void ManagementAgent::handleGetQuery(const string& body, const string& rte, cons
object = iter->second;
}
if (object) {
+ if (acl != 0) {
+ map<acl::Property, string> params;
+ params[acl::PROP_SCHEMACLASS] = object->getClassName();
+
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, object->getObjectId().getV2Key(), &params)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object " << object->getObjectId().getV2Key() << " from " << userId));
+ }
+ }
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
@@ -2011,6 +2038,14 @@ void ManagementAgent::handleGetQuery(const string& body, const string& rte, cons
}
} else {
// send class-based result.
+ if (acl != 0) {
+ map<acl::Property, string> params;
+ params[acl::PROP_SCHEMACLASS] = className;
+
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, className /* class-wide query */, &params)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object class " << className << " from " << userId));
+ }
+ }
Variant::List _list;
Variant::List _subList;
unsigned int objCount = 0;
@@ -2218,7 +2253,6 @@ bool ManagementAgent::authorizeAgentMessage(Message& msg)
}
if (methodReq) {
- // TODO: check method call against ACL list.
map<acl::Property, string> params;
AclModule* acl = broker->getAcl();
if (acl == 0)
@@ -2312,7 +2346,7 @@ void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal)
if (opcode == "_method_request")
return handleMethodRequest(body, rte, rtk, cid, context.getUserId(), viaLocal);
else if (opcode == "_query_request")
- return handleGetQuery(body, rte, rtk, cid, viaLocal);
+ return handleGetQuery(body, rte, rtk, cid, context.getUserId(), viaLocal);
else if (opcode == "_agent_locate_request")
return handleLocateRequest(body, rte, rtk, cid);
}
@@ -2334,7 +2368,7 @@ void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal)
else if (opcode == 'S') handleSchemaRequest (inBuffer, rte, rtk, sequence);
else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence);
else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, context.getObjectId());
- else if (opcode == 'G') handleGetQuery (inBuffer, rtk, sequence);
+ else if (opcode == 'G') handleGetQuery (inBuffer, rtk, sequence, context.getMgmtId());
else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, context.getMgmtId());
}
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index c2eb5d4a31..81bf542766 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -363,9 +363,9 @@ private:
void handleSchemaRequest (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
void handleSchemaResponse (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleAttachRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const ObjectId& objectId);
- void handleGetQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleGetQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const std::string& userId);
void handleMethodRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const std::string& userId);
- void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
+ void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const std::string& userId, bool viaLocal);
void handleMethodRequest (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const std::string& userId, bool viaLocal);
void handleLocateRequest (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py
index ec6b229ba4..5f5d1e01fe 100755
--- a/qpid/cpp/src/tests/acl.py
+++ b/qpid/cpp/src/tests/acl.py
@@ -27,6 +27,7 @@ from qpid.testlib import TestBase010
from qmf.console import Session
from qpid.datatypes import Message
import qpid.messaging
+from qpidtoollibs import BrokerAgent
class ACLFile:
def __init__(self, policy='data_dir/policy.acl'):
@@ -40,6 +41,14 @@ class ACLFile:
class ACLTests(TestBase010):
+ # required for testing QMF methods
+ def get_messaging_connection(self, user, passwd):
+ parms = {'username':user, 'password':passwd, 'sasl_mechanisms':'PLAIN'}
+ brokerurl="%s:%s" %(self.broker.host, self.broker.port)
+ connection = qpid.messaging.Connection(brokerurl, **parms)
+ connection.open()
+ return connection
+
# For connection limit tests this function
# throws if the connection won't start
# returns a connection that the caller can close if he likes.
@@ -796,10 +805,16 @@ class ACLTests(TestBase010):
aclf.write('acl deny bob@QPID create queue name=q1 durable=true\n')
aclf.write('acl deny bob@QPID create queue name=q2 exclusive=true policytype=ring\n')
aclf.write('acl deny bob@QPID access queue name=q3\n')
- aclf.write('acl deny bob@QPID purge queue name=q3\n')
aclf.write('acl deny bob@QPID delete queue name=q4\n')
aclf.write('acl deny bob@QPID create queue name=q5 maxqueuesize=1000 maxqueuecount=100\n')
aclf.write('acl deny bob@QPID create queue name=q6 paging=true\n')
+ aclf.write('acl deny bob@QPID purge queue name=q7\n')
+ aclf.write('acl deny bob@QPID move queue name=q7\n')
+ aclf.write('acl deny bob@QPID move queue name=q8 queuename=q7\n')
+ aclf.write('acl deny bob@QPID redirect queue name=q7\n')
+ aclf.write('acl deny bob@QPID redirect queue name=q8 queuename=q7\n')
+ aclf.write('acl deny bob@QPID reroute queue name=q7\n')
+ aclf.write('acl deny bob@QPID reroute queue name=q8 exchangename=amq.fanout\n')
aclf.write('acl allow all all')
aclf.close()
@@ -881,18 +896,89 @@ class ACLTests(TestBase010):
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
+ # some queues needs to be created for testing purge / move / reroute / redirect
+ session.queue_declare(queue="q7")
+ session.queue_declare(queue="q8")
+ session.queue_declare(queue="q9")
try:
- session.queue_purge(queue="q3")
- self.fail("ACL should deny queue purge request for q3");
+ session.queue_purge(queue="q7")
+ self.fail("ACL should deny queue purge request for q7");
except qpid.session.SessionException, e:
self.assertEqual(403,e.args[0].error_code)
session = self.get_session('bob','bob')
try:
- session.queue_purge(queue="q4")
+ session.queue_purge(queue="q8")
except qpid.session.SessionException, e:
if (403 == e.args[0].error_code):
- self.fail("ACL should allow queue purge request for q4");
+ self.fail("ACL should allow queue purge request for q8");
+
+ # as we use QMF methods, it is easier to use BrokerAgent from messaging.connection and not use session object as above
+ broker_agent = BrokerAgent(self.get_messaging_connection('bob','bob'))
+
+ try:
+ broker_agent.queueMoveMessages("q7", "q8", 0)
+ self.fail("ACL should deny queue move request from q7 to q8");
+ except Exception, e:
+ self.assertTrue("'error_code': 7," in e.args[0])
+ broker_agent = BrokerAgent(self.get_messaging_connection('bob','bob'))
+
+ try:
+ broker_agent.queueMoveMessages("q8", "q9", 0)
+ except Exception, e:
+ if ("'error_code': 7," in e.args[0]):
+ self.fail("ACL should allow queue move request from q8 to q9");
+
+ try:
+ broker_agent.queueMoveMessages("q9", "q8", 0)
+ except Exception, e:
+ if ("'error_code': 7," in e.args[0]):
+ self.fail("ACL should allow queue move request from q9 to q8");
+
+ try:
+ broker_agent.Redirect("q7", "q8")
+ self.fail("ACL should deny queue redirect request from q7 to q8");
+ except Exception, e:
+ self.assertTrue("'error_code': 7," in e.args[0])
+ broker_agent = BrokerAgent(self.get_messaging_connection('bob','bob'))
+
+ try:
+ broker_agent.Redirect("q8", "q9")
+ except Exception, e:
+ if ("'error_code': 7," in e.args[0]):
+ self.fail("ACL should allow queue redirect request from q8 to q9");
+
+ try:
+ broker_agent.Redirect("q9", "q8")
+ except Exception, e:
+ if ("'error_code': 7," in e.args[0]):
+ self.fail("ACL should allow queue redirect request from q9 to q8");
+
+ try:
+ broker_agent.getQueue('q7').reroute(0, False, "amq.fanout")
+ self.fail("ACL should deny queue reroute request from q7 to amq.fanout");
+ except Exception, e:
+ self.assertTrue("'error_code': 7," in e.args[0])
+ broker_agent = BrokerAgent(self.get_messaging_connection('bob','bob'))
+
+ try:
+ broker_agent.getQueue('q8').reroute(0, False, "amq.fanout")
+ self.fail("ACL should deny queue reroute request from q8 to amq.fanout");
+ except Exception, e:
+ self.assertTrue("'error_code': 7," in e.args[0])
+ broker_agent = BrokerAgent(self.get_messaging_connection('bob','bob'))
+
+ try:
+ broker_agent.getQueue('q8').reroute(0, False, "amq.direct")
+ except Exception, e:
+ if ("'error_code': 7," in e.args[0]):
+ self.fail("ACL should allow queue reroute request from q8 to amq.direct");
+
+ try:
+ broker_agent.getQueue('q9').reroute(0, False, "amq.fanout")
+ except Exception, e:
+ if ("'error_code': 7," in e.args[0]):
+ self.fail("ACL should allow queue reroute request from q9 to amq.fanout");
try:
session.queue_delete(queue="q4")
@@ -1748,6 +1834,52 @@ class ACLTests(TestBase010):
self.assertEqual(7,e.args[0]["error_code"])
assert e.args[0]["error_text"].find("unauthorized-access") == 0
+ def test_qmf_query(self):
+ aclf = self.get_acl_file()
+ aclf.write('acl allow all access exchange\n')
+ aclf.write('acl allow all bind exchange\n')
+ aclf.write('acl allow all create queue\n')
+ aclf.write('acl allow all access queue\n')
+ aclf.write('acl allow all delete queue\n')
+ aclf.write('acl allow all consume queue\n')
+ aclf.write('acl allow all access method\n')
+ aclf.write('acl allow bob@QPID access query name=org.apache.qpid.broker:queue:q1\n')
+ aclf.write('acl allow bob@QPID access query schemaclass=exchange\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ bob = BrokerAdmin(self.config.broker, "bob", "bob")
+
+ try:
+ bob.query(object_name="org.apache.qpid.broker:queue:q1")
+ except Exception, e:
+ if ("unauthorized-access:" in e.args[0]):
+ self.fail("ACL should allow queue QMF query for q1");
+
+ try:
+ bob.query(object_name="org.apache.qpid.broker:queue:q2")
+ self.fail("ACL should deny queue QMF query for q2");
+ except Exception, e:
+ self.assertTrue("unauthorized-access:" in e.args[0])
+ bob = BrokerAdmin(self.config.broker, "bob", "bob")
+
+ try:
+ bob.query(class_name="binding")
+ self.fail("ACL should deny class binding QMF query");
+ except Exception, e:
+ self.assertTrue("unauthorized-access:" in e.args[0])
+ bob = BrokerAdmin(self.config.broker, "bob", "bob")
+
+ try:
+ bob.query(class_name="exchange")
+ except Exception, e:
+ if ("unauthorized-access:" in e.args[0]):
+ self.fail("ACL should allow class exchange QMF query");
+
#=====================================
# ACL consume tests
@@ -3611,6 +3743,27 @@ class BrokerAdmin:
raise Exception(response.content['_values'])
else: raise Exception("Invalid response received, unexpected opcode: %s" % response.properties['qmf.opcode'])
else: raise Exception("Invalid response received, not a qmfv2 method: %s" % response.properties['x-amqp-0-10.app-id'])
+
+ def query(self, object_name=None, class_name=None):
+ content = { "_what": "OBJECT" }
+ if object_name is not None:
+ content["_object_id"] = {"_object_name": object_name }
+ if class_name is not None:
+ content["_schema_id"] = {"_class_name": class_name }
+ request = qpid.messaging.Message(reply_to=self.reply_to, content=content)
+ request.properties["x-amqp-0-10.app-id"] = "qmf2"
+ request.properties["qmf.opcode"] = "_query_request"
+ self.sender.send(request)
+ response = self.receiver.fetch()
+ self.session.acknowledge()
+ if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
+ if response.properties['qmf.opcode'] == '_query_response':
+ return
+ elif response.properties['qmf.opcode'] == '_exception':
+ raise Exception(response.content['_values'])
+ else: raise Exception("Invalid response received, unexpected opcode: %s" % response.properties['qmf.opcode'])
+ else: raise Exception("Invalid response received, not a qmfv2 method: %s" % response.properties['x-amqp-0-10.app-id'])
+
def create_exchange(self, name, exchange_type=None, options={}):
properties = options
if exchange_type: properties["exchange_type"] = exchange_type
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index e2ebf220cf..1e870c55a8 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -613,6 +613,7 @@ acl allow zag@QPID publish exchange
acl allow zag@QPID delete exchange
acl allow zag@QPID access method
acl allow zag@QPID create link
+acl allow zag@QPID access query
# Normal user
acl allow zig@QPID all all
acl deny all all
diff --git a/qpid/doc/book/src/cpp-broker/Security.xml b/qpid/doc/book/src/cpp-broker/Security.xml
index 00795a05d8..b078324908 100644
--- a/qpid/doc/book/src/cpp-broker/Security.xml
+++ b/qpid/doc/book/src/cpp-broker/Security.xml
@@ -408,11 +408,12 @@ com.sun.security.jgss.initiate {
permission = [allow | allow-log | deny | deny-log]
action = [consume | publish | create | access |
- bind | unbind | delete | purge | update]
- object = [queue | exchange | broker | link | method]
+ bind | unbind | delete | purge | update |
+ move | redirect | reroute]
+ object = [queue | exchange | broker | link | method | query]
property = [name | durable | owner | routingkey |
autodelete | exclusive |type |
- alternate | queuename |
+ alternate | queuename | exchangename |
schemapackage | schemaclass |
queuemaxsizelowerlimit |
queuemaxsizeupperlimit |
@@ -610,6 +611,39 @@ com.sun.security.jgss.initiate {
</para>
</entry>
+ </row>
+ <row>
+ <entry>
+ <command>move</command>
+ </entry>
+ <entry>
+ <para>
+ When moving messages between queues
+ </para>
+
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <command>redirect</command>
+ </entry>
+ <entry>
+ <para>
+ When redirecting messages between queues
+ </para>
+
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <command>reroute</command>
+ </entry>
+ <entry>
+ <para>
+ When rerouting messages from a queue to an exchange
+ </para>
+
+ </entry>
</row>
@@ -682,6 +716,18 @@ com.sun.security.jgss.initiate {
</entry>
</row>
+ <row>
+ <entry>
+ <command>query</command>
+ </entry>
+ <entry>
+ <para>
+ Management query (of an object or whole class)
+ </para>
+
+ </entry>
+
+ </row>
</tbody>
@@ -905,6 +951,12 @@ com.sun.security.jgss.initiate {
<entry>name schemapackage schemaclass</entry>
<entry></entry>
</row>
+ <row>
+ <entry>access</entry>
+ <entry>query</entry>
+ <entry>name schemaclass</entry>
+ <entry></entry>
+ </row>
<row>
<entry>access</entry>
<entry>queue</entry>
@@ -971,6 +1023,27 @@ com.sun.security.jgss.initiate {
<entry>name</entry>
<entry></entry>
</row>
+ <row>
+ <entry>move</entry>
+ <entry>queue</entry>
+ <entry>name</entry>
+ <entry>queuename</entry>
+ <entry></entry>
+ </row>
+ <row>
+ <entry>redirect</entry>
+ <entry>queue</entry>
+ <entry>name</entry>
+ <entry>queuename</entry>
+ <entry></entry>
+ </row>
+ <row>
+ <entry>reroute</entry>
+ <entry>queue</entry>
+ <entry>name</entry>
+ <entry>exchangename</entry>
+ <entry></entry>
+ </row>
<row>
<entry>unbind</entry>
<entry>exchange</entry>