diff options
Diffstat (limited to 'cpp/src/qpid/management/ManagementBroker.cpp')
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 322 |
1 files changed, 220 insertions, 102 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 1bdd8ab836..17f5c14592 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -27,6 +27,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Time.h" #include "qpid/broker/ConnectionState.h" +#include "qpid/broker/AclModule.h" #include <list> #include <iostream> #include <fstream> @@ -80,8 +81,8 @@ ManagementBroker::RemoteAgent::~RemoteAgent () ManagementBroker::ManagementBroker () : threadPoolSize(1), interval(10), broker(0) { - localBank = 5; nextObjectId = 1; + brokerBank = 1; bootSequence = 1; nextRemoteBank = 10; nextRequestSequence = 1; @@ -112,7 +113,7 @@ ManagementBroker::~ManagementBroker () } } -void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable* _broker, int _threads) +void ManagementBroker::configure(string _dataDir, uint16_t _interval, broker::Broker* _broker, int _threads) { dataDir = _dataDir; interval = _interval; @@ -140,7 +141,10 @@ void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable inFile.close(); QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); + // if sequence goes beyond a 12-bit field, skip zero and wrap to 1. bootSequence++; + if (bootSequence & 0xF000) + bootSequence = 1; writeData(); } else @@ -183,29 +187,26 @@ void ManagementBroker::RegisterClass (string packageName, AddClass(pIter, className, md5Sum, schemaCall); } -uint64_t ManagementBroker::addObject (ManagementObject* object, - uint32_t persistId, - uint32_t persistBank) +ObjectId ManagementBroker::addObject (ManagementObject* object, + uint64_t persistId) { Mutex::ScopedLock lock (addLock); - uint64_t objectId; + uint16_t sequence; + uint64_t objectNum; - if (persistId == 0) - { - objectId = ((uint64_t) bootSequence) << 48 | - ((uint64_t) localBank) << 24 | nextObjectId++; - if ((nextObjectId & 0xFF000000) != 0) - { - nextObjectId = 1; - localBank++; - } + if (persistId == 0) { + sequence = bootSequence; + objectNum = nextObjectId++; + } else { + sequence = 0; + objectNum = persistId; } - else - objectId = ((uint64_t) persistBank) << 24 | persistId; - object->setObjectId (objectId); - newManagementObjects[objectId] = object; - return objectId; + ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum); + + object->setObjectId(objId); + newManagementObjects[objId] = object; + return objId; } ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) @@ -308,7 +309,7 @@ void ManagementBroker::PeriodicProcessing (void) char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; - std::list<uint64_t> deleteList; + std::list<ObjectId> deleteList; { Buffer msgBuffer(msgChars, BUFSIZE); @@ -373,7 +374,7 @@ void ManagementBroker::PeriodicProcessing (void) } // Delete flagged objects - for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); + for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin (); iter != deleteList.rend (); iter++) managementObjects.erase (*iter); @@ -408,48 +409,72 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable, // Parse the routing key. This management broker should act as though it // is bound to the exchange to match the following keys: // - // agent.<X>.# - // broker.# - // - // where <X> is any non-negative decimal integer less than the lowest remote - // object-id bank. + // agent.0.# + // broker if (routingKey == "broker") { - dispatchAgentCommandLH (msg); + dispatchAgentCommandLH(msg); + return false; + } + + else if (routingKey.compare(0, 7, "agent.0") == 0) { + dispatchAgentCommandLH(msg); return false; } else if (routingKey.compare(0, 6, "agent.") == 0) { - std::string::size_type delim = routingKey.find('.', 6); - if (delim == string::npos) - delim = routingKey.length(); - string bank = routingKey.substr(6, delim - 6); - if ((uint32_t) atoi(bank.c_str()) <= localBank) { - dispatchAgentCommandLH (msg); - return false; - } + return authorizeAgentMessageLH(msg); } return true; } -void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, + uint32_t sequence, const ConnectionToken* connToken) { string methodName; + string packageName; + string className; + uint8_t hash[16]; Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; + AclModule* acl = broker->getAcl(); - uint64_t objId = inBuffer.getLongLong(); + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); inBuffer.getShortString(methodName); - EncodeHeader(outBuffer, 'm', sequence); + if (acl != 0) { + string userId = ((const broker::ConnectionState*) connToken)->getUserId(); + std::map<acl::Property, string> params; + params[acl::SCHEMAPACKAGE] = packageName; + params[acl::SCHEMACLASS] = className; + + if (!acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, ¶ms)) { + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); + return; + } + } + ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); } else { - iter->second->doMethod(methodName, inBuffer, outBuffer); + if ((iter->second->getPackageName() != packageName) || + (iter->second->getClassName() != className)) { + outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); + } + else + iter->second->doMethod(methodName, inBuffer, outBuffer); } outLen = MA_BUFFER_SIZE - outBuffer.available(); @@ -497,34 +522,33 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey FindOrAddPackageLH(packageName); } -void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) { std::string packageName; - inBuffer.getShortString (packageName); - PackageMap::iterator pIter = packages.find (packageName); - if (pIter != packages.end ()) + inBuffer.getShortString(packageName); + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { ClassMap cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin (); - cIter != cMap.end (); + for (ClassMap::iterator cIter = cMap.begin(); + cIter != cMap.end(); cIter++) { - if (cIter->second->hasSchema ()) + if (cIter->second.hasSchema()) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'q', sequence); - EncodeClassIndication (outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + EncodeHeader(outBuffer, 'q', sequence); + EncodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); } } } - - sendCommandComplete (replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); } void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) @@ -551,9 +575,7 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); - SchemaClass* newSchema = new SchemaClass; - newSchema->pendingSequence = sequence; - pIter->second[key] = newSchema; + pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence))); } } @@ -569,7 +591,7 @@ void ManagementBroker::SchemaClass::appendSchema(Buffer& buf) buf.putRawData(buffer, bufferLen); } -void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -578,33 +600,33 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe inBuffer.getShortString (key.name); inBuffer.getBin128 (key.hash); - PackageMap::iterator pIter = packages.find (packageName); + PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { ClassMap cMap = pIter->second; - ClassMap::iterator cIter = cMap.find (key); + ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end()) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - SchemaClass* classInfo = cIter->second; + SchemaClass& classInfo = cIter->second; - if (classInfo->hasSchema()) { + if (classInfo.hasSchema()) { EncodeHeader(outBuffer, 's', sequence); - classInfo->appendSchema (outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + classInfo.appendSchema(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); } else - sendCommandComplete (replyToKey, sequence, 1, "Schema not available"); + sendCommandComplete(replyToKey, sequence, 1, "Schema not available"); } else - sendCommandComplete (replyToKey, sequence, 1, "Class key not found"); + sendCommandComplete(replyToKey, sequence, 1, "Class key not found"); } else - sendCommandComplete (replyToKey, sequence, 1, "Package not found"); + sendCommandComplete(replyToKey, sequence, 1, "Package not found"); } -void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) +void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -619,24 +641,26 @@ void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyT if (pIter != packages.end()) { ClassMap cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); - if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) { + if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { size_t length = ValidateSchema(inBuffer); - if (length == 0) + if (length == 0) { + QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name); cMap.erase(key); + } else { - cIter->second->buffer = (uint8_t*) malloc(length); - cIter->second->bufferLen = length; - inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen); + cIter->second.buffer = (uint8_t*) malloc(length); + cIter->second.bufferLen = length; + inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen); // Publish a class-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'q'); - EncodeClassIndication (outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); + EncodeHeader(outBuffer, 'q'); + EncodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); } } } @@ -671,14 +695,14 @@ uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank) void ManagementBroker::deleteOrphanedAgentsLH() { - vector<uint64_t> deleteList; + vector<ObjectId> deleteList; for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { - uint64_t connectionRef = aIter->first; + ObjectId connectionRef = aIter->first; bool found = false; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); iter++) { if (iter->first == connectionRef && !iter->second->isDeleted()) { found = true; @@ -692,10 +716,8 @@ void ManagementBroker::deleteOrphanedAgentsLH() } } - for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) { - + for (vector<ObjectId>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) remoteAgents.erase(*dIter); - } deleteList.clear(); } @@ -705,7 +727,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe string label; uint32_t requestedBank; uint32_t assignedBank; - uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); + ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; moveNewObjectsLH(); @@ -741,6 +763,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe uint32_t outLen; EncodeHeader (outBuffer, 'a', sequence); + outBuffer.putLong (brokerBank); outBuffer.putLong (assignedBank); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); @@ -786,13 +809,77 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui sendCommandComplete (replyToKey, sequence); } -void ManagementBroker::dispatchAgentCommandLH (Message& msg) +bool ManagementBroker::authorizeAgentMessageLH(Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); uint8_t opcode; uint32_t sequence; string replyToKey; + if (msg.encodedSize() > MA_BUFFER_SIZE) + return false; + + msg.encodeContent(inBuffer); + inBuffer.reset(); + + if (!CheckHeader(inBuffer, &opcode, &sequence)) + return false; + + if (opcode == 'M') { + // TODO: check method call against ACL list. + AclModule* acl = broker->getAcl(); + if (acl == 0) + return true; + + string userId = ((const broker::ConnectionState*) msg.getPublisher())->getUserId(); + string packageName; + string className; + uint8_t hash[16]; + string methodName; + + std::map<acl::Property, string> params; + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + inBuffer.getShortString(methodName); + + params[acl::SCHEMAPACKAGE] = packageName; + params[acl::SCHEMACLASS] = className; + + if (acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, ¶ms)) + return true; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); + + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 'm', sequence); + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); + } + + return false; + } + + return true; +} + +void ManagementBroker::dispatchAgentCommandLH(Message& msg) +{ + Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + uint32_t sequence; + string replyToKey; + const framing::MessageProperties* p = msg.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { @@ -823,7 +910,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); } ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) @@ -834,7 +921,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std: // No such package found, create a new map entry. pair<PackageMap::iterator, bool> result = - packages.insert (pair<string, ClassMap> (name, ClassMap ())); + packages.insert(pair<string, ClassMap>(name, ClassMap())); QPID_LOG (debug, "ManagementBroker added package " << name); // Publish a package-indication message @@ -859,20 +946,18 @@ void ManagementBroker::AddClass(PackageMap::iterator pIter, ClassMap& cMap = pIter->second; key.name = className; - memcpy (&key.hash, md5Sum, 16); + memcpy(&key.hash, md5Sum, 16); - ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) return; // No such class found, create a new class with local information. QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << key.name); - SchemaClass* classInfo = new SchemaClass; - classInfo->writeSchemaCall = schemaCall; - cMap[key] = classInfo; - cIter = cMap.find (key); + cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall))); + cIter = cMap.find(key); } void ManagementBroker::EncodePackageIndication (Buffer& buf, @@ -917,6 +1002,8 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) for (uint16_t idx = 0; idx < methCount; idx++) { FieldTable ft; ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; int argCount = ft.getInt("argCount"); for (int mIdx = 0; mIdx < argCount; mIdx++) { FieldTable aft; @@ -924,10 +1011,41 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) } } - if (evntCount != 0) - return 0; + for (uint16_t idx = 0; idx < evntCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; + int argCount = ft.getInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } + } end = inBuffer.getPosition(); inBuffer.restore(); // restore original position return end - start; } + +Mutex& ManagementBroker::getMutex() +{ + return userLock; +} + +Buffer* ManagementBroker::startEventLH() +{ + Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE)); + EncodeHeader(*outBuffer, 'e'); + outBuffer->putLongLong(uint64_t(Duration(now()))); + return outBuffer; +} + +void ManagementBroker::finishEventLH(Buffer* outBuffer) +{ + uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available(); + outBuffer->reset(); + SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event"); + delete outBuffer; +} + |