summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management/ManagementBroker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/management/ManagementBroker.cpp')
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp322
1 files changed, 220 insertions, 102 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index 1bdd8ab836..17f5c14592 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -27,6 +27,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/sys/Time.h"
#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/AclModule.h"
#include <list>
#include <iostream>
#include <fstream>
@@ -80,8 +81,8 @@ ManagementBroker::RemoteAgent::~RemoteAgent ()
ManagementBroker::ManagementBroker () :
threadPoolSize(1), interval(10), broker(0)
{
- localBank = 5;
nextObjectId = 1;
+ brokerBank = 1;
bootSequence = 1;
nextRemoteBank = 10;
nextRequestSequence = 1;
@@ -112,7 +113,7 @@ ManagementBroker::~ManagementBroker ()
}
}
-void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable* _broker, int _threads)
+void ManagementBroker::configure(string _dataDir, uint16_t _interval, broker::Broker* _broker, int _threads)
{
dataDir = _dataDir;
interval = _interval;
@@ -140,7 +141,10 @@ void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable
inFile.close();
QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid);
+ // if sequence goes beyond a 12-bit field, skip zero and wrap to 1.
bootSequence++;
+ if (bootSequence & 0xF000)
+ bootSequence = 1;
writeData();
}
else
@@ -183,29 +187,26 @@ void ManagementBroker::RegisterClass (string packageName,
AddClass(pIter, className, md5Sum, schemaCall);
}
-uint64_t ManagementBroker::addObject (ManagementObject* object,
- uint32_t persistId,
- uint32_t persistBank)
+ObjectId ManagementBroker::addObject (ManagementObject* object,
+ uint64_t persistId)
{
Mutex::ScopedLock lock (addLock);
- uint64_t objectId;
+ uint16_t sequence;
+ uint64_t objectNum;
- if (persistId == 0)
- {
- objectId = ((uint64_t) bootSequence) << 48 |
- ((uint64_t) localBank) << 24 | nextObjectId++;
- if ((nextObjectId & 0xFF000000) != 0)
- {
- nextObjectId = 1;
- localBank++;
- }
+ if (persistId == 0) {
+ sequence = bootSequence;
+ objectNum = nextObjectId++;
+ } else {
+ sequence = 0;
+ objectNum = persistId;
}
- else
- objectId = ((uint64_t) persistBank) << 24 | persistId;
- object->setObjectId (objectId);
- newManagementObjects[objectId] = object;
- return objectId;
+ ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum);
+
+ object->setObjectId(objId);
+ newManagementObjects[objId] = object;
+ return objId;
}
ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
@@ -308,7 +309,7 @@ void ManagementBroker::PeriodicProcessing (void)
char msgChars[BUFSIZE];
uint32_t contentSize;
string routingKey;
- std::list<uint64_t> deleteList;
+ std::list<ObjectId> deleteList;
{
Buffer msgBuffer(msgChars, BUFSIZE);
@@ -373,7 +374,7 @@ void ManagementBroker::PeriodicProcessing (void)
}
// Delete flagged objects
- for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
+ for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin ();
iter != deleteList.rend ();
iter++)
managementObjects.erase (*iter);
@@ -408,48 +409,72 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
// Parse the routing key. This management broker should act as though it
// is bound to the exchange to match the following keys:
//
- // agent.<X>.#
- // broker.#
- //
- // where <X> is any non-negative decimal integer less than the lowest remote
- // object-id bank.
+ // agent.0.#
+ // broker
if (routingKey == "broker") {
- dispatchAgentCommandLH (msg);
+ dispatchAgentCommandLH(msg);
+ return false;
+ }
+
+ else if (routingKey.compare(0, 7, "agent.0") == 0) {
+ dispatchAgentCommandLH(msg);
return false;
}
else if (routingKey.compare(0, 6, "agent.") == 0) {
- std::string::size_type delim = routingKey.find('.', 6);
- if (delim == string::npos)
- delim = routingKey.length();
- string bank = routingKey.substr(6, delim - 6);
- if ((uint32_t) atoi(bank.c_str()) <= localBank) {
- dispatchAgentCommandLH (msg);
- return false;
- }
+ return authorizeAgentMessageLH(msg);
}
return true;
}
-void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
+ uint32_t sequence, const ConnectionToken* connToken)
{
string methodName;
+ string packageName;
+ string className;
+ uint8_t hash[16];
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
+ AclModule* acl = broker->getAcl();
- uint64_t objId = inBuffer.getLongLong();
+ ObjectId objId(inBuffer);
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
-
EncodeHeader(outBuffer, 'm', sequence);
+ if (acl != 0) {
+ string userId = ((const broker::ConnectionState*) connToken)->getUserId();
+ std::map<acl::Property, string> params;
+ params[acl::SCHEMAPACKAGE] = packageName;
+ params[acl::SCHEMACLASS] = className;
+
+ if (!acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, &params)) {
+ outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+ outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ return;
+ }
+ }
+
ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter == managementObjects.end() || iter->second->isDeleted()) {
outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
} else {
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ if ((iter->second->getPackageName() != packageName) ||
+ (iter->second->getClassName() != className)) {
+ outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER);
+ outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
+ }
+ else
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
}
outLen = MA_BUFFER_SIZE - outBuffer.available();
@@ -497,34 +522,33 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey
FindOrAddPackageLH(packageName);
}
-void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
std::string packageName;
- inBuffer.getShortString (packageName);
- PackageMap::iterator pIter = packages.find (packageName);
- if (pIter != packages.end ())
+ inBuffer.getShortString(packageName);
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end())
{
ClassMap cMap = pIter->second;
- for (ClassMap::iterator cIter = cMap.begin ();
- cIter != cMap.end ();
+ for (ClassMap::iterator cIter = cMap.begin();
+ cIter != cMap.end();
cIter++)
{
- if (cIter->second->hasSchema ())
+ if (cIter->second.hasSchema())
{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'q', sequence);
- EncodeClassIndication (outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ EncodeHeader(outBuffer, 'q', sequence);
+ EncodeClassIndication(outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
}
}
-
- sendCommandComplete (replyToKey, sequence);
+ sendCommandComplete(replyToKey, sequence);
}
void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
@@ -551,9 +575,7 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- SchemaClass* newSchema = new SchemaClass;
- newSchema->pendingSequence = sequence;
- pIter->second[key] = newSchema;
+ pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence)));
}
}
@@ -569,7 +591,7 @@ void ManagementBroker::SchemaClass::appendSchema(Buffer& buf)
buf.putRawData(buffer, bufferLen);
}
-void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -578,33 +600,33 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe
inBuffer.getShortString (key.name);
inBuffer.getBin128 (key.hash);
- PackageMap::iterator pIter = packages.find (packageName);
+ PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
ClassMap cMap = pIter->second;
- ClassMap::iterator cIter = cMap.find (key);
+ ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end()) {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- SchemaClass* classInfo = cIter->second;
+ SchemaClass& classInfo = cIter->second;
- if (classInfo->hasSchema()) {
+ if (classInfo.hasSchema()) {
EncodeHeader(outBuffer, 's', sequence);
- classInfo->appendSchema (outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ classInfo.appendSchema(outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
else
- sendCommandComplete (replyToKey, sequence, 1, "Schema not available");
+ sendCommandComplete(replyToKey, sequence, 1, "Schema not available");
}
else
- sendCommandComplete (replyToKey, sequence, 1, "Class key not found");
+ sendCommandComplete(replyToKey, sequence, 1, "Class key not found");
}
else
- sendCommandComplete (replyToKey, sequence, 1, "Package not found");
+ sendCommandComplete(replyToKey, sequence, 1, "Package not found");
}
-void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
+void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -619,24 +641,26 @@ void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyT
if (pIter != packages.end()) {
ClassMap cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
- if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) {
+ if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) {
size_t length = ValidateSchema(inBuffer);
- if (length == 0)
+ if (length == 0) {
+ QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name);
cMap.erase(key);
+ }
else {
- cIter->second->buffer = (uint8_t*) malloc(length);
- cIter->second->bufferLen = length;
- inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen);
+ cIter->second.buffer = (uint8_t*) malloc(length);
+ cIter->second.bufferLen = length;
+ inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen);
// Publish a class-indication message
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'q');
- EncodeClassIndication (outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
+ EncodeHeader(outBuffer, 'q');
+ EncodeClassIndication(outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
}
}
}
@@ -671,14 +695,14 @@ uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank)
void ManagementBroker::deleteOrphanedAgentsLH()
{
- vector<uint64_t> deleteList;
+ vector<ObjectId> deleteList;
for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) {
- uint64_t connectionRef = aIter->first;
+ ObjectId connectionRef = aIter->first;
bool found = false;
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
iter++) {
if (iter->first == connectionRef && !iter->second->isDeleted()) {
found = true;
@@ -692,10 +716,8 @@ void ManagementBroker::deleteOrphanedAgentsLH()
}
}
- for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) {
-
+ for (vector<ObjectId>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++)
remoteAgents.erase(*dIter);
- }
deleteList.clear();
}
@@ -705,7 +727,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
string label;
uint32_t requestedBank;
uint32_t assignedBank;
- uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
+ ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
Uuid systemId;
moveNewObjectsLH();
@@ -741,6 +763,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
uint32_t outLen;
EncodeHeader (outBuffer, 'a', sequence);
+ outBuffer.putLong (brokerBank);
outBuffer.putLong (assignedBank);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
@@ -786,13 +809,77 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui
sendCommandComplete (replyToKey, sequence);
}
-void ManagementBroker::dispatchAgentCommandLH (Message& msg)
+bool ManagementBroker::authorizeAgentMessageLH(Message& msg)
{
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
uint8_t opcode;
uint32_t sequence;
string replyToKey;
+ if (msg.encodedSize() > MA_BUFFER_SIZE)
+ return false;
+
+ msg.encodeContent(inBuffer);
+ inBuffer.reset();
+
+ if (!CheckHeader(inBuffer, &opcode, &sequence))
+ return false;
+
+ if (opcode == 'M') {
+ // TODO: check method call against ACL list.
+ AclModule* acl = broker->getAcl();
+ if (acl == 0)
+ return true;
+
+ string userId = ((const broker::ConnectionState*) msg.getPublisher())->getUserId();
+ string packageName;
+ string className;
+ uint8_t hash[16];
+ string methodName;
+
+ std::map<acl::Property, string> params;
+ ObjectId objId(inBuffer);
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
+ inBuffer.getShortString(methodName);
+
+ params[acl::SCHEMAPACKAGE] = packageName;
+ params[acl::SCHEMACLASS] = className;
+
+ if (acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, &params))
+ return true;
+
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ if (p && p->hasReplyTo()) {
+ const framing::ReplyTo& rt = p->getReplyTo();
+ replyToKey = rt.getRoutingKey();
+
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader(outBuffer, 'm', sequence);
+ outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+ outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ }
+
+ return false;
+ }
+
+ return true;
+}
+
+void ManagementBroker::dispatchAgentCommandLH(Message& msg)
+{
+ Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
+ uint8_t opcode;
+ uint32_t sequence;
+ string replyToKey;
+
const framing::MessageProperties* p =
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
@@ -823,7 +910,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg)
else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
}
ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
@@ -834,7 +921,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std:
// No such package found, create a new map entry.
pair<PackageMap::iterator, bool> result =
- packages.insert (pair<string, ClassMap> (name, ClassMap ()));
+ packages.insert(pair<string, ClassMap>(name, ClassMap()));
QPID_LOG (debug, "ManagementBroker added package " << name);
// Publish a package-indication message
@@ -859,20 +946,18 @@ void ManagementBroker::AddClass(PackageMap::iterator pIter,
ClassMap& cMap = pIter->second;
key.name = className;
- memcpy (&key.hash, md5Sum, 16);
+ memcpy(&key.hash, md5Sum, 16);
- ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end())
return;
// No such class found, create a new class with local information.
QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
key.name);
- SchemaClass* classInfo = new SchemaClass;
- classInfo->writeSchemaCall = schemaCall;
- cMap[key] = classInfo;
- cIter = cMap.find (key);
+ cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall)));
+ cIter = cMap.find(key);
}
void ManagementBroker::EncodePackageIndication (Buffer& buf,
@@ -917,6 +1002,8 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
for (uint16_t idx = 0; idx < methCount; idx++) {
FieldTable ft;
ft.decode(inBuffer);
+ if (!ft.isSet("argCount"))
+ return 0;
int argCount = ft.getInt("argCount");
for (int mIdx = 0; mIdx < argCount; mIdx++) {
FieldTable aft;
@@ -924,10 +1011,41 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
}
}
- if (evntCount != 0)
- return 0;
+ for (uint16_t idx = 0; idx < evntCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ if (!ft.isSet("argCount"))
+ return 0;
+ int argCount = ft.getInt("argCount");
+ for (int mIdx = 0; mIdx < argCount; mIdx++) {
+ FieldTable aft;
+ aft.decode(inBuffer);
+ }
+ }
end = inBuffer.getPosition();
inBuffer.restore(); // restore original position
return end - start;
}
+
+Mutex& ManagementBroker::getMutex()
+{
+ return userLock;
+}
+
+Buffer* ManagementBroker::startEventLH()
+{
+ Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE));
+ EncodeHeader(*outBuffer, 'e');
+ outBuffer->putLongLong(uint64_t(Duration(now())));
+ return outBuffer;
+}
+
+void ManagementBroker::finishEventLH(Buffer* outBuffer)
+{
+ uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available();
+ outBuffer->reset();
+ SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event");
+ delete outBuffer;
+}
+