diff options
author | Ted Ross <tross@apache.org> | 2008-09-03 18:01:44 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-09-03 18:01:44 +0000 |
commit | 13191e7951b29453c75db8384391d26f0402139d (patch) | |
tree | b3ca5bd5f9c8523924cf9d1a0fe118099bef9894 /cpp/src | |
parent | b8cf7ea1034fa6216183f03022c83efdb154f3ee (diff) | |
download | qpid-python-13191e7951b29453c75db8384391d26f0402139d.tar.gz |
QPID-1174 Updates to the management framework
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@691700 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgent.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 585 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.h | 101 | ||||
-rw-r--r-- | cpp/src/qpid/broker/AclModule.h | 25 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/System.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Vhost.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FieldTable.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/management/Manageable.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/management/Manageable.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 322 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.h | 33 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 74 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 45 |
16 files changed, 875 insertions, 352 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgent.h b/cpp/src/qpid/agent/ManagementAgent.h index e7379e6c94..1c219f7463 100644 --- a/cpp/src/qpid/agent/ManagementAgent.h +++ b/cpp/src/qpid/agent/ManagementAgent.h @@ -65,10 +65,14 @@ class ManagementAgent // agent's thread. In this case, the callback implementations // MUST be thread safe. // + // storeFile - File where this process has read and write access. This + // file shall be used to store persistent state. + // virtual void init (std::string brokerHost = "localhost", uint16_t brokerPort = 5672, uint16_t intervalSeconds = 10, - bool useExternalThread = false) = 0; + bool useExternalThread = false, + std::string storeFile = "") = 0; // Register a schema with the management agent. This is normally called by the // package initializer generated by the management code generator. @@ -93,9 +97,8 @@ class ManagementAgent // pointer. This allows the management agent to report the deletion of the object // in an orderly way. // - virtual uint64_t addObject (ManagementObject* objectPtr, - uint32_t persistId = 0, - uint32_t persistBank = 4) = 0; + virtual ObjectId addObject (ManagementObject* objectPtr, + uint64_t persistId = 0) = 0; // If "useExternalThread" was set to true in init, this method must // be called to provide a thread for any pending method calls that have arrived. @@ -120,6 +123,12 @@ class ManagementAgent // virtual int getSignalFd (void) = 0; +protected: + friend class ManagementObject; + virtual sys::Mutex& getMutex() = 0; + virtual framing::Buffer* startEventLH() = 0; + virtual void finishEventLH(framing::Buffer* buf) = 0; + }; }} diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index ebdc71e3b1..c4108b0ae2 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -24,12 +24,21 @@ #include <list> #include <unistd.h> #include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> +#include <fcntl.h> +#include <iostream> +#include <fstream> + using namespace qpid::client; using namespace qpid::framing; using namespace qpid::management; using namespace qpid::sys; using std::stringstream; +using std::ofstream; +using std::ifstream; using std::string; using std::cout; using std::endl; @@ -66,128 +75,186 @@ ManagementAgent* ManagementAgent::Singleton::getInstance() return agent; } +const string ManagementAgentImpl::storeMagicNumber("MA01"); + ManagementAgentImpl::ManagementAgentImpl() : - clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false) + extThread(false), writeFd(-1), readFd(-1), + clientWasAdded(true), requestedBank(0), + assignedBank(0), brokerBank(0), bootSequence(0), + connThreadBody(*this), connThread(connThreadBody), + pubThreadBody(*this), pubThread(pubThreadBody) { // TODO: Establish system ID } -void ManagementAgentImpl::init(std::string brokerHost, - uint16_t brokerPort, - uint16_t intervalSeconds, - bool useExternalThread) +void ManagementAgentImpl::init(string brokerHost, + uint16_t brokerPort, + uint16_t intervalSeconds, + bool useExternalThread, + string _storeFile) { - { - Mutex::ScopedLock lock(agentLock); - startupWait = true; - } - interval = intervalSeconds; extThread = useExternalThread; + storeFile = _storeFile; nextObjectId = 1; + host = brokerHost; + port = brokerPort; + + // TODO: Abstract the socket calls for portability + if (extThread) { + int pair[2]; + int result = socketpair(PF_LOCAL, SOCK_STREAM, 0, pair); + if (result == -1) { + return; + } + writeFd = pair[0]; + readFd = pair[1]; - sessionId.generate(); - queueName << "qmfagent-" << sessionId; - string dest = "qmfagent"; - - connection.open(brokerHost.c_str(), brokerPort); - session = connection.newSession (queueName.str()); - dispatcher = new client::Dispatcher(session); + // Set the readFd to non-blocking + int flags = fcntl(readFd, F_GETFL); + fcntl(readFd, F_SETFL, flags | O_NONBLOCK); + } + retrieveData(); + bootSequence++; + if ((bootSequence & 0xF000) != 0) + bootSequence = 1; + storeData(true); +} - session.queueDeclare (arg::queue=queueName.str()); - session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(), - arg::bindingKey=queueName.str()); - session.messageSubscribe (arg::queue=queueName.str(), - arg::destination=dest); - session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF); - session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF); +ManagementAgentImpl::~ManagementAgentImpl() +{ +} - Message attachRequest; - char rawbuffer[512]; - Buffer buffer (rawbuffer, 512); +void ManagementAgentImpl::RegisterClass(std::string packageName, + std::string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = FindOrAddPackage(packageName); + AddClassLocal(pIter, className, md5Sum, schemaCall); +} - attachRequest.getDeliveryProperties().setRoutingKey("broker"); - attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); +ObjectId ManagementAgentImpl::addObject(ManagementObject* object, + uint64_t persistId) +{ + Mutex::ScopedLock lock(addLock); + uint16_t sequence = persistId ? 0 : bootSequence; + uint64_t objectNum = persistId ? persistId : nextObjectId++; - EncodeHeader (buffer, 'A'); - buffer.putShortString ("RemoteAgent [C++]"); - systemId.encode (buffer); - buffer.putLong (11); + ObjectId objectId(&attachment, 0, sequence, objectNum); - size_t length = 512 - buffer.available (); - string stringBuffer (rawbuffer, length); - attachRequest.setData (stringBuffer); + // TODO: fix object-id handling + object->setObjectId(objectId); + newManagementObjects[objectId] = object; + return objectId; +} - session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management"); +uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) +{ + Mutex::ScopedLock lock(agentLock); - dispatcher->listen(dest, this); - dispatcher->start(); + for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) { + if (methodQueue.empty()) + break; - { - Mutex::ScopedLock lock(agentLock); - if (startupWait) - startupCond.wait(agentLock); + QueuedMethod* item = methodQueue.front(); + methodQueue.pop_front(); + { + Mutex::ScopedUnlock unlock(agentLock); + Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size()); + invokeMethodRequest(inBuffer, item->sequence, item->replyTo); + delete item; + } } + + uint8_t rbuf[100]; + while (read(readFd, rbuf, 100) > 0); // Consume all signaling bytes + return methodQueue.size(); } -ManagementAgentImpl::~ManagementAgentImpl() +int ManagementAgentImpl::getSignalFd(void) { - dispatcher->stop(); - session.close(); - delete dispatcher; + return readFd; } -void ManagementAgentImpl::RegisterClass (std::string packageName, - std::string className, - uint8_t* md5Sum, - management::ManagementObject::writeSchemaCall_t schemaCall) -{ - Mutex::ScopedLock lock(agentLock); - PackageMap::iterator pIter = FindOrAddPackage (packageName); - AddClassLocal (pIter, className, md5Sum, schemaCall); +void ManagementAgentImpl::startProtocol() +{ + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + + EncodeHeader(buffer, 'A'); + buffer.putShortString("RemoteAgent [C++]"); + systemId.encode (buffer); + buffer.putLong(requestedBank); + uint32_t length = 512 - buffer.available(); + buffer.reset(); + connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker"); } -uint64_t ManagementAgentImpl::addObject (ManagementObject* object, - uint32_t /*persistId*/, - uint32_t /*persistBank*/) +void ManagementAgentImpl::storeData(bool requested) { - Mutex::ScopedLock lock(addLock); - uint64_t objectId; + if (!storeFile.empty()) { + ofstream outFile(storeFile.c_str()); + uint32_t bankToWrite = requested ? requestedBank : assignedBank; - // TODO: fix object-id handling - objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF); - object->setObjectId (objectId); - newManagementObjects[objectId] = object; - return objectId; + if (outFile.good()) { + outFile << storeMagicNumber << " " << bankToWrite << " " << bootSequence << endl; + outFile.close(); + } + } } -uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/) +void ManagementAgentImpl::retrieveData() { - return 0; + if (!storeFile.empty()) { + ifstream inFile(storeFile.c_str()); + string mn; + + if (inFile.good()) { + inFile >> mn; + if (mn == storeMagicNumber) { + inFile >> requestedBank; + inFile >> bootSequence; + } + inFile.close(); + } + } } -int ManagementAgentImpl::getSignalFd(void) +void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence, + uint32_t code, string text) { - return -1; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 'z', sequence); + outBuffer.putLong(code); + outBuffer.putShortString(text); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey); } void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) { Mutex::ScopedLock lock(agentLock); - uint32_t assigned; - stringstream key; - assigned = inBuffer.getLong(); - objIdPrefix = ((uint64_t) assigned) << 24; + brokerBank = inBuffer.getLong(); + assignedBank = inBuffer.getLong(); + if (assignedBank != requestedBank) { + if (requestedBank == 0) + cout << "Initial object-id bank assigned: " << assignedBank << endl; + else + cout << "Collision in object-id! New bank assigned: " << assignedBank << endl; + storeData(); + } - startupWait = false; - startupCond.notify(); + attachment.setBanks(brokerBank, assignedBank); // Bind to qpid.management to receive commands - key << "agent." << assigned; - session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(), - arg::bindingKey=key.str()); + connThreadBody.bindToBank(assignedBank); // Send package indications for all local packages for (PackageMap::iterator pIter = packages.begin(); @@ -198,9 +265,9 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) EncodeHeader(outBuffer, 'p'); EncodePackageIndication(outBuffer, pIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); // Send class indications for all local classes ClassMap cMap = pIter->second; @@ -208,9 +275,9 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) outBuffer.reset(); EncodeHeader(outBuffer, 'q'); EncodeClassIndication(outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -236,9 +303,9 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc EncodeHeader(outBuffer, 's', sequence); schema.writeSchemaCall(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -249,28 +316,93 @@ void ManagementAgentImpl::handleConsoleAddedIndication() clientWasAdded = true; } -void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) { string methodName; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + string packageName; + string className; + uint8_t hash[16]; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - uint64_t objId = inBuffer.getLongLong(); + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); inBuffer.getShortString(methodName); EncodeHeader(outBuffer, 'm', sequence); ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { - outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); - outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); + outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT)); } else { - iter->second->doMethod(methodName, inBuffer, outBuffer); + if ((iter->second->getPackageName() != packageName) || + (iter->second->getClassName() != className)) { + outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); + } + else + iter->second->doMethod(methodName, inBuffer, outBuffer); } outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "amq.direct", replyTo); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); +} + +void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + moveNewObjectsLH(); + + ft.decode(inBuffer); + value = ft.get("_class"); + if (value.get() == 0 || !value->convertsTo<string>()) + { + // TODO: Send completion with an error code + return; + } + + string className(value->get<string>()); + + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) + { + ManagementObject* object = iter->second; + if (object->getClassName() == className) + { + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 'g', sequence); + object->writeProperties(outBuffer); + object->writeStatistics(outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + } + } + + sendCommandComplete(replyTo, sequence); +} + +void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +{ + if (extThread) { + Mutex::ScopedLock lock(agentLock); + string body; + + inBuffer.getRawData(body, inBuffer.available()); + methodQueue.push_back(new QueuedMethod(sequence, replyTo, body)); + write(writeFd, "X", 1); + } else { + invokeMethodRequest(inBuffer, sequence, replyTo); + } } void ManagementAgentImpl::received(Message& msg) @@ -287,103 +419,86 @@ void ManagementAgentImpl::received(Message& msg) replyToKey = rt.getRoutingKey(); } - if (CheckHeader (inBuffer, &opcode, &sequence)) + if (CheckHeader(inBuffer, &opcode, &sequence)) { if (opcode == 'a') handleAttachResponse(inBuffer); else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); else if (opcode == 'x') handleConsoleAddedIndication(); + else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey); else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey); } } -void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +void ManagementAgentImpl::EncodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) { - buf.putOctet ('A'); - buf.putOctet ('M'); - buf.putOctet ('1'); - buf.putOctet (opcode); - buf.putLong (seq); + buf.putOctet('A'); + buf.putOctet('M'); + buf.putOctet('1'); + buf.putOctet(opcode); + buf.putLong (seq); } -bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) { if (buf.getSize() < 8) return false; - uint8_t h1 = buf.getOctet (); - uint8_t h2 = buf.getOctet (); - uint8_t h3 = buf.getOctet (); + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); - *opcode = buf.getOctet (); - *seq = buf.getLong (); + *opcode = buf.getOctet(); + *seq = buf.getLong(); return h1 == 'A' && h2 == 'M' && h3 == '1'; } -void ManagementAgentImpl::SendBuffer (Buffer& buf, - uint32_t length, - string exchange, - string routingKey) -{ - Message msg; - string data; - - if (objIdPrefix == 0) - return; - - buf.getRawData(data, length); - msg.getDeliveryProperties().setRoutingKey(routingKey); - msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); - msg.setData (data); - session.messageTransfer (arg::content=msg, arg::destination=exchange); -} - -ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name) +ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage(std::string name) { - PackageMap::iterator pIter = packages.find (name); - if (pIter != packages.end ()) + PackageMap::iterator pIter = packages.find(name); + if (pIter != packages.end()) return pIter; // No such package found, create a new map entry. std::pair<PackageMap::iterator, bool> result = - packages.insert (std::pair<string, ClassMap> (name, ClassMap ())); + packages.insert(std::pair<string, ClassMap>(name, ClassMap())); // Publish a package-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'p'); - EncodePackageIndication (outBuffer, result.first); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package"); + EncodeHeader(outBuffer, 'p'); + EncodePackageIndication(outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.schema.package"); return result.first; } void ManagementAgentImpl::moveNewObjectsLH() { - Mutex::ScopedLock lock (addLock); - for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); - iter != newManagementObjects.end (); + Mutex::ScopedLock lock(addLock); + for (ManagementObjectMap::iterator iter = newManagementObjects.begin(); + iter != newManagementObjects.end(); iter++) managementObjects[iter->first] = iter->second; newManagementObjects.clear(); } -void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter, - string className, - uint8_t* md5Sum, - management::ManagementObject::writeSchemaCall_t schemaCall) +void ManagementAgentImpl::AddClassLocal(PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; key.name = className; - memcpy (&key.hash, md5Sum, 16); + memcpy(&key.hash, md5Sum, 16); - ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) return; // No such class found, create a new class with local information. @@ -395,21 +510,21 @@ void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter, // TODO: Publish a class-indication message } -void ManagementAgentImpl::EncodePackageIndication (Buffer& buf, - PackageMap::iterator pIter) +void ManagementAgentImpl::EncodePackageIndication(Buffer& buf, + PackageMap::iterator pIter) { - buf.putShortString ((*pIter).first); + buf.putShortString((*pIter).first); } -void ManagementAgentImpl::EncodeClassIndication (Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter) +void ManagementAgentImpl::EncodeClassIndication(Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) { SchemaClassKey key = (*cIter).first; - buf.putShortString ((*pIter).first); - buf.putShortString (key.name); - buf.putBin128 (key.hash); + buf.putShortString((*pIter).first); + buf.putShortString(key.name); + buf.putBin128 (key.hash); } void ManagementAgentImpl::PeriodicProcessing() @@ -419,17 +534,17 @@ void ManagementAgentImpl::PeriodicProcessing() char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; - std::list<uint64_t> deleteList; + std::list<ObjectId> deleteList; { Buffer msgBuffer(msgChars, BUFSIZE); EncodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); + contentSize = BUFSIZE - msgBuffer.available(); + msgBuffer.reset(); routingKey = "mgmt." + systemId.str() + ".heartbeat"; - SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); } moveNewObjectsLH(); @@ -437,65 +552,171 @@ void ManagementAgentImpl::PeriodicProcessing() if (clientWasAdded) { clientWasAdded = false; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; - object->setAllChanged (); + object->setAllChanged(); } } - if (managementObjects.empty ()) + if (managementObjects.empty()) return; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; - if (object->getConfigChanged () || object->isDeleted ()) + if (object->getConfigChanged() || object->isDeleted()) { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'c'); + Buffer msgBuffer(msgChars, BUFSIZE); + EncodeHeader(msgBuffer, 'c'); object->writeProperties(msgBuffer); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + contentSize = BUFSIZE - msgBuffer.available(); + msgBuffer.reset(); + routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName(); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); } - if (object->getInstChanged ()) + if (object->getInstChanged()) { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'i'); + Buffer msgBuffer(msgChars, BUFSIZE); + EncodeHeader(msgBuffer, 'i'); object->writeStatistics(msgBuffer); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + contentSize = BUFSIZE - msgBuffer.available(); + msgBuffer.reset(); + routingKey = "mgmt." + systemId.str() + ".stat." + object->getClassName(); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); } - if (object->isDeleted ()) - deleteList.push_back (iter->first); + if (object->isDeleted()) + deleteList.push_back(iter->first); } // Delete flagged objects - for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); - iter != deleteList.rend (); + for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin(); + iter != deleteList.rend(); iter++) - managementObjects.erase (*iter); + managementObjects.erase(*iter); - deleteList.clear (); + deleteList.clear(); } -void ManagementAgentImpl::BackgroundThread::run() +void ManagementAgentImpl::ConnectionThread::run() +{ + static const int delayMin(1); + static const int delayMax(128); + static const int delayFactor(2); + int delay(delayMin); + string dest("qmfagent"); + + sessionId.generate(); + queueName << "qmfagent-" << sessionId; + + while (true) { + try { + if (!agent.host.empty()) { + connection.open(agent.host.c_str(), agent.port); + session = connection.newSession(queueName.str()); + subscriptions = new client::SubscriptionManager(session); + + session.queueDeclare(arg::queue=queueName.str()); + session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), + arg::bindingKey=queueName.str()); + + subscriptions->subscribe(agent, queueName.str(), dest); + { + Mutex::ScopedLock _lock(connLock); + operational = true; + agent.startProtocol(); + try { + Mutex::ScopedUnlock _unlock(connLock); + subscriptions->run(); + } catch (std::exception) {} + + operational = false; + } + delay = delayMin; + delete subscriptions; + subscriptions = 0; + session.close(); + } + } catch (std::exception &e) { + if (delay < delayMax) + delay *= delayFactor; + } + + ::sleep(delay); + } +} + +ManagementAgentImpl::ConnectionThread::~ConnectionThread() +{ + if (subscriptions != 0) { + delete subscriptions; + } +} + +void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, + uint32_t length, + string exchange, + string routingKey) +{ + { + Mutex::ScopedLock _lock(connLock); + if (!operational) + return; + } + + Message msg; + string data; + + buf.getRawData(data, length); + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); + msg.setData(data); + session.messageTransfer(arg::content=msg, arg::destination=exchange); +} + +void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank) +{ + stringstream key; + key << "agent." << agentBank; + session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(), + arg::bindingKey=key.str()); +} + + +void ManagementAgentImpl::PublishThread::run() { while (true) { ::sleep(5); agent.PeriodicProcessing(); } } + +Mutex& ManagementAgentImpl::getMutex() +{ + return agentLock; +} + +Buffer* ManagementAgentImpl::startEventLH() +{ + Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE)); + EncodeHeader(*outBuffer, 'e'); + outBuffer->putLongLong(uint64_t(Duration(now()))); + return outBuffer; +} + +void ManagementAgentImpl::finishEventLH(Buffer* outBuffer) +{ + uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available(); + outBuffer->reset(); + connThreadBody.sendBuffer(*outBuffer, outLen, "qpid.management", "mgmt.event"); + delete outBuffer; +} diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index f7f19e145d..7d9be6daf9 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -22,7 +22,7 @@ #include "ManagementAgent.h" #include "qpid/client/Connection.h" -#include "qpid/client/Dispatcher.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/client/Session.h" #include "qpid/client/AsyncSession.h" #include "qpid/client/Message.h" @@ -30,10 +30,10 @@ #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/Condition.h" #include "qpid/framing/Uuid.h" #include <iostream> #include <sstream> +#include <deque> namespace qpid { namespace management { @@ -49,14 +49,14 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void init(std::string brokerHost = "localhost", uint16_t brokerPort = 5672, uint16_t intervalSeconds = 10, - bool useExternalThread = false); + bool useExternalThread = false, + std::string storeFile = ""); void RegisterClass(std::string packageName, std::string className, uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall); - uint64_t addObject (management::ManagementObject* objectPtr, - uint32_t persistId = 0, - uint32_t persistBank = 4); + ObjectId addObject (management::ManagementObject* objectPtr, + uint64_t persistId = 0); uint32_t pollCallbacks (uint32_t callLimit = 0); int getSignalFd (void); @@ -64,14 +64,12 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen private: - struct SchemaClassKey - { + struct SchemaClassKey { std::string name; uint8_t hash[16]; }; - struct SchemaClassKeyComp - { + struct SchemaClassKeyComp { bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const { if (lhs.name != rhs.name) @@ -84,53 +82,95 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen } }; - struct SchemaClass - { + struct SchemaClass { management::ManagementObject::writeSchemaCall_t writeSchemaCall; SchemaClass () : writeSchemaCall(0) {} }; + struct QueuedMethod { + QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) : + sequence(_seq), replyTo(_reply), body(_body) {} + + uint32_t sequence; + std::string replyTo; + std::string body; + }; + + typedef std::deque<QueuedMethod*> MethodQueue; typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; typedef std::map<std::string, ClassMap> PackageMap; PackageMap packages; + AgentAttachment attachment; management::ManagementObjectMap managementObjects; management::ManagementObjectMap newManagementObjects; + MethodQueue methodQueue; void received (client::Message& msg); uint16_t interval; bool extThread; + int writeFd; + int readFd; uint64_t nextObjectId; + std::string storeFile; sys::Mutex agentLock; sys::Mutex addLock; - framing::Uuid sessionId; framing::Uuid systemId; + std::string host; + uint16_t port; - int signalFdIn, signalFdOut; - client::Connection connection; - client::Session session; - client::Dispatcher* dispatcher; bool clientWasAdded; - uint64_t objIdPrefix; - std::stringstream queueName; + uint32_t requestedBank; + uint32_t assignedBank; + uint32_t brokerBank; + uint16_t bootSequence; # define MA_BUFFER_SIZE 65536 char outputBuffer[MA_BUFFER_SIZE]; + char eventBuffer[MA_BUFFER_SIZE]; - class BackgroundThread : public sys::Runnable + friend class ConnectionThread; + class ConnectionThread : public sys::Runnable { + bool operational; ManagementAgentImpl& agent; + framing::Uuid sessionId; + client::Connection connection; + client::Session session; + client::SubscriptionManager* subscriptions; + std::stringstream queueName; + sys::Mutex connLock; void run(); public: - BackgroundThread(ManagementAgentImpl& _agent) : agent(_agent) {} + ConnectionThread(ManagementAgentImpl& _agent) : + operational(false), agent(_agent), subscriptions(0) {} + ~ConnectionThread(); + void sendBuffer(qpid::framing::Buffer& buf, + uint32_t length, + std::string exchange, + std::string routingKey); + void bindToBank(uint32_t agentBank); }; - BackgroundThread bgThread; - sys::Thread thread; - sys::Condition startupCond; - bool startupWait; + class PublishThread : public sys::Runnable + { + ManagementAgentImpl& agent; + void run(); + public: + PublishThread(ManagementAgentImpl& _agent) : agent(_agent) {} + }; + + ConnectionThread connThreadBody; + sys::Thread connThread; + PublishThread pubThreadBody; + sys::Thread pubThread; + + static const std::string storeMagicNumber; + void startProtocol(); + void storeData(bool requested=false); + void retrieveData(); PackageMap::iterator FindOrAddPackage (std::string name); void moveNewObjectsLH(); void AddClassLocal (PackageMap::iterator pIter, @@ -144,16 +184,19 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen ClassMap::iterator cIter); void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void SendBuffer (qpid::framing::Buffer& buf, - uint32_t length, - std::string exchange, - std::string routingKey); + void sendCommandComplete (std::string replyToKey, uint32_t sequence, + uint32_t code = 0, std::string text = std::string("OK")); void handleAttachResponse (qpid::framing::Buffer& inBuffer); void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); + void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); + void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleConsoleAddedIndication(); + sys::Mutex& getMutex(); + framing::Buffer* startEventLH(); + void finishEventLH(framing::Buffer* outBuffer); }; }} diff --git a/cpp/src/qpid/broker/AclModule.h b/cpp/src/qpid/broker/AclModule.h index ec832daf22..36a3f0baab 100644 --- a/cpp/src/qpid/broker/AclModule.h +++ b/cpp/src/qpid/broker/AclModule.h @@ -33,9 +33,11 @@ namespace qpid { namespace acl { -enum ObjectType {QUEUE, EXCHANGE, BROKER, LINK, ROUTE,OBJECTSIZE}; -enum Action {CONSUME, PUBLISH, CREATE, ACCESS, BIND, UNBIND, DELETE, PURGE, UPDATE, ACTIONSIZE}; -enum Property {NAME, DURABLE, OWNER, ROUTINGKEY, PASSIVE, AUTODELETE, EXCLUSIVE, TYPE, ALTERNATE, QUEUENAME}; +enum ObjectType {QUEUE, EXCHANGE, BROKER, LINK, ROUTE, METHOD, OBJECTSIZE}; // OBJECTSIZE must be last in list +enum Action {CONSUME, PUBLISH, CREATE, ACCESS, BIND, UNBIND, DELETE, PURGE, + UPDATE, ACTIONSIZE}; // ACTIONSIZE must be last in list +enum Property {NAME, DURABLE, OWNER, ROUTINGKEY, PASSIVE, AUTODELETE, EXCLUSIVE, TYPE, ALTERNATE, + QUEUENAME, SCHEMAPACKAGE, SCHEMACLASS}; enum AclResult {ALLOW, ALLOWLOG, DENY, DENYLOG}; } // namespace acl @@ -74,6 +76,7 @@ class AclHelper { if (str.compare("broker") == 0) return BROKER; if (str.compare("link") == 0) return LINK; if (str.compare("route") == 0) return ROUTE; + if (str.compare("method") == 0) return METHOD; throw str; } static inline std::string getObjectTypeStr(const ObjectType o) { @@ -83,6 +86,7 @@ class AclHelper { case BROKER: return "broker"; case LINK: return "link"; case ROUTE: return "route"; + case METHOD: return "method"; default: assert(false); // should never get here } } @@ -123,6 +127,8 @@ class AclHelper { if (str.compare("type") == 0) return TYPE; if (str.compare("alternate") == 0) return ALTERNATE; if (str.compare("queuename") == 0) return QUEUENAME; + if (str.compare("schemapackage") == 0) return SCHEMAPACKAGE; + if (str.compare("schemaclass") == 0) return SCHEMACLASS; throw str; } static inline std::string getPropertyStr(const Property p) { @@ -137,6 +143,8 @@ class AclHelper { case TYPE: return "type"; case ALTERNATE: return "alternate"; case QUEUENAME: return "queuename"; + case SCHEMAPACKAGE: return "schemapackage"; + case SCHEMACLASS: return "schemaclass"; default: assert(false); // should never get here } } @@ -231,6 +239,17 @@ class AclHelper { a3->insert(actionPair(DELETE, p0)); map->insert(objectPair(ROUTE, a3)); + + // == Method == + + propSetPtr p5(new propSet); + p5->insert(SCHEMAPACKAGE); + p5->insert(SCHEMACLASS); + + actionMapPtr a4(new actionMap); + a4->insert(actionPair(ACCESS, p5)); + + map->insert(objectPair(METHOD, a4)); } }; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index e983aee5c9..94a392b921 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -158,10 +158,12 @@ Broker::Broker(const Broker::Options& conf) : mgmtObject->set_stagingThreshold (conf.stagingThreshold); mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval); mgmtObject->set_version (PACKAGE_VERSION); - mgmtObject->set_dataDirEnabled (dataDir.isEnabled ()); - mgmtObject->set_dataDir (dataDir.getPath ()); + if (dataDir.isEnabled()) + mgmtObject->set_dataDir(dataDir.getPath()); + else + mgmtObject->clr_dataDir(); - managementAgent->addObject (mgmtObject, 2, 1); + managementAgent->addObject (mgmtObject, 0x1000000000000002LL); // Since there is currently no support for virtual hosts, a placeholder object // representing the implied single virtual host is added here to keep the diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index fbfcaede82..6416e2fc73 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -57,9 +57,9 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel mgmtExchange = new management::Exchange (agent, this, parent, _name, durable); if (!durable) { if (name == "") - agent->addObject (mgmtExchange, 4, 1); // Special default exchange ID + agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID else if (name == "qpid.management") - agent->addObject (mgmtExchange, 5, 1); // Special management exchange ID + agent->addObject (mgmtExchange, 0x1000000000000005LL); // Special management exchange ID else agent->addObject (mgmtExchange); } @@ -78,7 +78,7 @@ void Exchange::setPersistenceId(uint64_t id) const if (mgmtExchange != 0 && persistenceId == 0) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); - agent->addObject (mgmtExchange, id, 2); + agent->addObject (mgmtExchange, 0x2000000000000000LL + id); } persistenceId = id; } @@ -130,7 +130,7 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang ManagementObject* mo = queue->GetManagementObject(); if (mo != 0) { - uint64_t queueId = mo->getObjectId(); + management::ObjectId queueId = mo->getObjectId(); mgmtBinding = new management::Binding (agent, this, (Manageable*) parent, queueId, key, args); agent->addObject (mgmtBinding); } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 40dfb80da2..090c4b4bca 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -618,7 +618,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const if (mgmtObject != 0 && persistenceId == 0) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); - agent->addObject (mgmtObject, _persistenceId, 3); + agent->addObject (mgmtObject, 0x3000000000000000LL + _persistenceId); if (externalQueueStore) { ManagementObject* childObj = externalQueueStore->GetManagementObject(); diff --git a/cpp/src/qpid/broker/System.cpp b/cpp/src/qpid/broker/System.cpp index 6c58339432..d562c43069 100644 --- a/cpp/src/qpid/broker/System.cpp +++ b/cpp/src/qpid/broker/System.cpp @@ -73,7 +73,7 @@ System::System (string _dataDir) : mgmtObject(0) mgmtObject->set_machine (std::string (_uname.machine)); } - agent->addObject (mgmtObject, 1, 1); + agent->addObject (mgmtObject, 0x1000000000000001LL); } } diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp index 23203ec13e..c0eb6f03ed 100644 --- a/cpp/src/qpid/broker/Vhost.cpp +++ b/cpp/src/qpid/broker/Vhost.cpp @@ -32,7 +32,7 @@ Vhost::Vhost (management::Manageable* parentBroker) : mgmtObject(0) if (agent != 0) { mgmtObject = new management::Vhost (agent, this, parentBroker, "/"); - agent->addObject (mgmtObject, 3, 1); + agent->addObject (mgmtObject, 0x1000000000000003LL); } } } diff --git a/cpp/src/qpid/framing/FieldTable.h b/cpp/src/qpid/framing/FieldTable.h index 3c65d31aee..ed27f3fef6 100644 --- a/cpp/src/qpid/framing/FieldTable.h +++ b/cpp/src/qpid/framing/FieldTable.h @@ -58,6 +58,7 @@ class FieldTable int count() const; void set(const std::string& name, const ValuePtr& value); ValuePtr get(const std::string& name) const; + bool isSet(const std::string& name) const { return get(name).get() != 0; } void setString(const std::string& name, const std::string& value); void setInt(const std::string& name, int value); diff --git a/cpp/src/qpid/management/Manageable.cpp b/cpp/src/qpid/management/Manageable.cpp index 0f3fbab55c..19fba87fde 100644 --- a/cpp/src/qpid/management/Manageable.cpp +++ b/cpp/src/qpid/management/Manageable.cpp @@ -31,6 +31,7 @@ std::string Manageable::StatusText (status_t status) case STATUS_NOT_IMPLEMENTED : return "NotImplemented"; case STATUS_INVALID_PARAMETER : return "InvalidParameter"; case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented"; + case STATUS_FORBIDDEN : return "Forbidden"; } return "??"; diff --git a/cpp/src/qpid/management/Manageable.h b/cpp/src/qpid/management/Manageable.h index e2b8980465..4e2f33b625 100644 --- a/cpp/src/qpid/management/Manageable.h +++ b/cpp/src/qpid/management/Manageable.h @@ -44,6 +44,7 @@ class Manageable static const status_t STATUS_NOT_IMPLEMENTED = 3; static const status_t STATUS_INVALID_PARAMETER = 4; static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5; + static const status_t STATUS_FORBIDDEN = 6; // Every "Manageable" object must hold a reference to exactly one // management object. This object is always of a class derived from diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 1bdd8ab836..17f5c14592 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -27,6 +27,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Time.h" #include "qpid/broker/ConnectionState.h" +#include "qpid/broker/AclModule.h" #include <list> #include <iostream> #include <fstream> @@ -80,8 +81,8 @@ ManagementBroker::RemoteAgent::~RemoteAgent () ManagementBroker::ManagementBroker () : threadPoolSize(1), interval(10), broker(0) { - localBank = 5; nextObjectId = 1; + brokerBank = 1; bootSequence = 1; nextRemoteBank = 10; nextRequestSequence = 1; @@ -112,7 +113,7 @@ ManagementBroker::~ManagementBroker () } } -void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable* _broker, int _threads) +void ManagementBroker::configure(string _dataDir, uint16_t _interval, broker::Broker* _broker, int _threads) { dataDir = _dataDir; interval = _interval; @@ -140,7 +141,10 @@ void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable inFile.close(); QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); + // if sequence goes beyond a 12-bit field, skip zero and wrap to 1. bootSequence++; + if (bootSequence & 0xF000) + bootSequence = 1; writeData(); } else @@ -183,29 +187,26 @@ void ManagementBroker::RegisterClass (string packageName, AddClass(pIter, className, md5Sum, schemaCall); } -uint64_t ManagementBroker::addObject (ManagementObject* object, - uint32_t persistId, - uint32_t persistBank) +ObjectId ManagementBroker::addObject (ManagementObject* object, + uint64_t persistId) { Mutex::ScopedLock lock (addLock); - uint64_t objectId; + uint16_t sequence; + uint64_t objectNum; - if (persistId == 0) - { - objectId = ((uint64_t) bootSequence) << 48 | - ((uint64_t) localBank) << 24 | nextObjectId++; - if ((nextObjectId & 0xFF000000) != 0) - { - nextObjectId = 1; - localBank++; - } + if (persistId == 0) { + sequence = bootSequence; + objectNum = nextObjectId++; + } else { + sequence = 0; + objectNum = persistId; } - else - objectId = ((uint64_t) persistBank) << 24 | persistId; - object->setObjectId (objectId); - newManagementObjects[objectId] = object; - return objectId; + ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum); + + object->setObjectId(objId); + newManagementObjects[objId] = object; + return objId; } ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) @@ -308,7 +309,7 @@ void ManagementBroker::PeriodicProcessing (void) char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; - std::list<uint64_t> deleteList; + std::list<ObjectId> deleteList; { Buffer msgBuffer(msgChars, BUFSIZE); @@ -373,7 +374,7 @@ void ManagementBroker::PeriodicProcessing (void) } // Delete flagged objects - for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); + for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin (); iter != deleteList.rend (); iter++) managementObjects.erase (*iter); @@ -408,48 +409,72 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable, // Parse the routing key. This management broker should act as though it // is bound to the exchange to match the following keys: // - // agent.<X>.# - // broker.# - // - // where <X> is any non-negative decimal integer less than the lowest remote - // object-id bank. + // agent.0.# + // broker if (routingKey == "broker") { - dispatchAgentCommandLH (msg); + dispatchAgentCommandLH(msg); + return false; + } + + else if (routingKey.compare(0, 7, "agent.0") == 0) { + dispatchAgentCommandLH(msg); return false; } else if (routingKey.compare(0, 6, "agent.") == 0) { - std::string::size_type delim = routingKey.find('.', 6); - if (delim == string::npos) - delim = routingKey.length(); - string bank = routingKey.substr(6, delim - 6); - if ((uint32_t) atoi(bank.c_str()) <= localBank) { - dispatchAgentCommandLH (msg); - return false; - } + return authorizeAgentMessageLH(msg); } return true; } -void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, + uint32_t sequence, const ConnectionToken* connToken) { string methodName; + string packageName; + string className; + uint8_t hash[16]; Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; + AclModule* acl = broker->getAcl(); - uint64_t objId = inBuffer.getLongLong(); + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); inBuffer.getShortString(methodName); - EncodeHeader(outBuffer, 'm', sequence); + if (acl != 0) { + string userId = ((const broker::ConnectionState*) connToken)->getUserId(); + std::map<acl::Property, string> params; + params[acl::SCHEMAPACKAGE] = packageName; + params[acl::SCHEMACLASS] = className; + + if (!acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, ¶ms)) { + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); + return; + } + } + ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); } else { - iter->second->doMethod(methodName, inBuffer, outBuffer); + if ((iter->second->getPackageName() != packageName) || + (iter->second->getClassName() != className)) { + outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); + } + else + iter->second->doMethod(methodName, inBuffer, outBuffer); } outLen = MA_BUFFER_SIZE - outBuffer.available(); @@ -497,34 +522,33 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey FindOrAddPackageLH(packageName); } -void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) { std::string packageName; - inBuffer.getShortString (packageName); - PackageMap::iterator pIter = packages.find (packageName); - if (pIter != packages.end ()) + inBuffer.getShortString(packageName); + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { ClassMap cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin (); - cIter != cMap.end (); + for (ClassMap::iterator cIter = cMap.begin(); + cIter != cMap.end(); cIter++) { - if (cIter->second->hasSchema ()) + if (cIter->second.hasSchema()) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'q', sequence); - EncodeClassIndication (outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + EncodeHeader(outBuffer, 'q', sequence); + EncodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); } } } - - sendCommandComplete (replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); } void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) @@ -551,9 +575,7 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); - SchemaClass* newSchema = new SchemaClass; - newSchema->pendingSequence = sequence; - pIter->second[key] = newSchema; + pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence))); } } @@ -569,7 +591,7 @@ void ManagementBroker::SchemaClass::appendSchema(Buffer& buf) buf.putRawData(buffer, bufferLen); } -void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -578,33 +600,33 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe inBuffer.getShortString (key.name); inBuffer.getBin128 (key.hash); - PackageMap::iterator pIter = packages.find (packageName); + PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { ClassMap cMap = pIter->second; - ClassMap::iterator cIter = cMap.find (key); + ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end()) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - SchemaClass* classInfo = cIter->second; + SchemaClass& classInfo = cIter->second; - if (classInfo->hasSchema()) { + if (classInfo.hasSchema()) { EncodeHeader(outBuffer, 's', sequence); - classInfo->appendSchema (outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + classInfo.appendSchema(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); } else - sendCommandComplete (replyToKey, sequence, 1, "Schema not available"); + sendCommandComplete(replyToKey, sequence, 1, "Schema not available"); } else - sendCommandComplete (replyToKey, sequence, 1, "Class key not found"); + sendCommandComplete(replyToKey, sequence, 1, "Class key not found"); } else - sendCommandComplete (replyToKey, sequence, 1, "Package not found"); + sendCommandComplete(replyToKey, sequence, 1, "Package not found"); } -void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) +void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -619,24 +641,26 @@ void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyT if (pIter != packages.end()) { ClassMap cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); - if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) { + if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { size_t length = ValidateSchema(inBuffer); - if (length == 0) + if (length == 0) { + QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name); cMap.erase(key); + } else { - cIter->second->buffer = (uint8_t*) malloc(length); - cIter->second->bufferLen = length; - inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen); + cIter->second.buffer = (uint8_t*) malloc(length); + cIter->second.bufferLen = length; + inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen); // Publish a class-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'q'); - EncodeClassIndication (outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); + EncodeHeader(outBuffer, 'q'); + EncodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); } } } @@ -671,14 +695,14 @@ uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank) void ManagementBroker::deleteOrphanedAgentsLH() { - vector<uint64_t> deleteList; + vector<ObjectId> deleteList; for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { - uint64_t connectionRef = aIter->first; + ObjectId connectionRef = aIter->first; bool found = false; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); iter++) { if (iter->first == connectionRef && !iter->second->isDeleted()) { found = true; @@ -692,10 +716,8 @@ void ManagementBroker::deleteOrphanedAgentsLH() } } - for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) { - + for (vector<ObjectId>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) remoteAgents.erase(*dIter); - } deleteList.clear(); } @@ -705,7 +727,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe string label; uint32_t requestedBank; uint32_t assignedBank; - uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); + ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; moveNewObjectsLH(); @@ -741,6 +763,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe uint32_t outLen; EncodeHeader (outBuffer, 'a', sequence); + outBuffer.putLong (brokerBank); outBuffer.putLong (assignedBank); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); @@ -786,13 +809,77 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui sendCommandComplete (replyToKey, sequence); } -void ManagementBroker::dispatchAgentCommandLH (Message& msg) +bool ManagementBroker::authorizeAgentMessageLH(Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); uint8_t opcode; uint32_t sequence; string replyToKey; + if (msg.encodedSize() > MA_BUFFER_SIZE) + return false; + + msg.encodeContent(inBuffer); + inBuffer.reset(); + + if (!CheckHeader(inBuffer, &opcode, &sequence)) + return false; + + if (opcode == 'M') { + // TODO: check method call against ACL list. + AclModule* acl = broker->getAcl(); + if (acl == 0) + return true; + + string userId = ((const broker::ConnectionState*) msg.getPublisher())->getUserId(); + string packageName; + string className; + uint8_t hash[16]; + string methodName; + + std::map<acl::Property, string> params; + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + inBuffer.getShortString(methodName); + + params[acl::SCHEMAPACKAGE] = packageName; + params[acl::SCHEMACLASS] = className; + + if (acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, ¶ms)) + return true; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); + + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 'm', sequence); + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); + } + + return false; + } + + return true; +} + +void ManagementBroker::dispatchAgentCommandLH(Message& msg) +{ + Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + uint32_t sequence; + string replyToKey; + const framing::MessageProperties* p = msg.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { @@ -823,7 +910,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); } ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) @@ -834,7 +921,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std: // No such package found, create a new map entry. pair<PackageMap::iterator, bool> result = - packages.insert (pair<string, ClassMap> (name, ClassMap ())); + packages.insert(pair<string, ClassMap>(name, ClassMap())); QPID_LOG (debug, "ManagementBroker added package " << name); // Publish a package-indication message @@ -859,20 +946,18 @@ void ManagementBroker::AddClass(PackageMap::iterator pIter, ClassMap& cMap = pIter->second; key.name = className; - memcpy (&key.hash, md5Sum, 16); + memcpy(&key.hash, md5Sum, 16); - ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) return; // No such class found, create a new class with local information. QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << key.name); - SchemaClass* classInfo = new SchemaClass; - classInfo->writeSchemaCall = schemaCall; - cMap[key] = classInfo; - cIter = cMap.find (key); + cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall))); + cIter = cMap.find(key); } void ManagementBroker::EncodePackageIndication (Buffer& buf, @@ -917,6 +1002,8 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) for (uint16_t idx = 0; idx < methCount; idx++) { FieldTable ft; ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; int argCount = ft.getInt("argCount"); for (int mIdx = 0; mIdx < argCount; mIdx++) { FieldTable aft; @@ -924,10 +1011,41 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) } } - if (evntCount != 0) - return 0; + for (uint16_t idx = 0; idx < evntCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; + int argCount = ft.getInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } + } end = inBuffer.getPosition(); inBuffer.restore(); // restore original position return end - start; } + +Mutex& ManagementBroker::getMutex() +{ + return userLock; +} + +Buffer* ManagementBroker::startEventLH() +{ + Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE)); + EncodeHeader(*outBuffer, 'e'); + outBuffer->putLongLong(uint64_t(Duration(now()))); + return outBuffer; +} + +void ManagementBroker::finishEventLH(Buffer* outBuffer) +{ + uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available(); + outBuffer->reset(); + SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event"); + delete outBuffer; +} + diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h index 151926f526..e3b5504752 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -47,7 +47,7 @@ class ManagementBroker : public ManagementAgent ManagementBroker (); virtual ~ManagementBroker (); - void configure (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize); + void configure (std::string dataDir, uint16_t interval, broker::Broker* broker, int threadPoolSize); void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (broker::Exchange::shared_ptr mgmtExchange, broker::Exchange::shared_ptr directExchange); @@ -56,16 +56,15 @@ class ManagementBroker : public ManagementAgent std::string className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); - uint64_t addObject (ManagementObject* object, - uint32_t persistId = 0, - uint32_t persistBank = 4); + ObjectId addObject (ManagementObject* object, + uint64_t persistId = 0); void clientAdded (void); bool dispatchCommand (broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); // Stubs for remote management agent calls - void init (std::string, uint16_t, uint16_t, bool) { assert(0); } + void init (std::string, uint16_t, uint16_t, bool, std::string) { assert(0); } uint32_t pollCallbacks (uint32_t) { assert(0); return 0; } int getSignalFd () { assert(0); return -1; } @@ -88,7 +87,7 @@ class ManagementBroker : public ManagementAgent { uint32_t objIdBank; std::string routingKey; - uint64_t connectionRef; + ObjectId connectionRef; Agent* mgmtObject; ManagementObject* GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); @@ -97,7 +96,7 @@ class ManagementBroker : public ManagementAgent // TODO: Eventually replace string with entire reply-to structure. reply-to // currently assumes that the exchange is "amq.direct" even though it could // in theory be specified differently. - typedef std::map<uint64_t, RemoteAgent*> RemoteAgentMap; + typedef std::map<ObjectId, RemoteAgent*> RemoteAgentMap; typedef std::vector<std::string> ReplyToVector; // Storage for known schema classes: @@ -133,12 +132,15 @@ class ManagementBroker : public ManagementAgent size_t bufferLen; uint8_t* buffer; - SchemaClass () : writeSchemaCall(0), pendingSequence(0), bufferLen(0), buffer(0) {} + SchemaClass(uint32_t seq) : + writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {} + SchemaClass(ManagementObject::writeSchemaCall_t call) : + writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {} bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } void appendSchema (framing::Buffer& buf); }; - typedef std::map<SchemaClassKey, SchemaClass*, SchemaClassKeyComp> ClassMap; + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; typedef std::map<std::string, ClassMap> PackageMap; RemoteAgentMap remoteAgents; @@ -157,10 +159,10 @@ class ManagementBroker : public ManagementAgent broker::Exchange::shared_ptr dExchange; std::string dataDir; uint16_t interval; - Manageable* broker; + broker::Broker* broker; uint16_t bootSequence; - uint32_t localBank; uint32_t nextObjectId; + uint32_t brokerBank; uint32_t nextRemoteBank; uint32_t nextRequestSequence; bool clientWasAdded; @@ -168,6 +170,7 @@ class ManagementBroker : public ManagementAgent # define MA_BUFFER_SIZE 65536 char inputBuffer[MA_BUFFER_SIZE]; char outputBuffer[MA_BUFFER_SIZE]; + char eventBuffer[MA_BUFFER_SIZE]; void writeData (); void PeriodicProcessing (void); @@ -179,7 +182,8 @@ class ManagementBroker : public ManagementAgent std::string routingKey); void moveNewObjectsLH(); - void dispatchAgentCommandLH (broker::Message& msg); + bool authorizeAgentMessageLH(broker::Message& msg); + void dispatchAgentCommandLH(broker::Message& msg); PackageMap::iterator FindOrAddPackageLH(std::string name); void AddClass(PackageMap::iterator pIter, @@ -206,9 +210,12 @@ class ManagementBroker : public ManagementAgent void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken); size_t ValidateSchema(framing::Buffer&); + sys::Mutex& getMutex(); + framing::Buffer* startEventLH(); + void finishEventLH(framing::Buffer* outBuffer); }; }} diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 74d9571d10..e0386ee057 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -28,6 +28,62 @@ using namespace qpid::framing; using namespace qpid::management; using namespace qpid::sys; +void AgentAttachment::setBanks(uint32_t broker, uint32_t bank) +{ + first = + ((uint64_t) (broker & 0x000fffff)) << 28 | + ((uint64_t) (bank & 0x0fffffff)); +} + +ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object) + : agent(0) +{ + first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48 | + ((uint64_t) (broker & 0x000fffff)) << 28 | + ((uint64_t) (bank & 0x0fffffff)); + second = object; +} + +ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object) + : agent(_agent) +{ + first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48; + second = object; +} + +bool ObjectId::operator==(const ObjectId &other) const +{ + uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + + return first == otherFirst && second == other.second; +} + +bool ObjectId::operator<(const ObjectId &other) const +{ + uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + + return (first < otherFirst) || ((first == otherFirst) && (second < other.second)); +} + +void ObjectId::encode(framing::Buffer& buffer) +{ + if (agent == 0) + buffer.putLongLong(first); + else + buffer.putLongLong(first | agent->first); + buffer.putLongLong(second); +} + +void ObjectId::decode(framing::Buffer& buffer) +{ + first = buffer.getLongLong(); + second = buffer.getLongLong(); +} + int ManagementObject::nextThreadIndex = 0; void ManagementObject::writeTimestamps (Buffer& buf) @@ -38,10 +94,10 @@ void ManagementObject::writeTimestamps (Buffer& buf) buf.putLongLong (uint64_t (Duration (now ()))); buf.putLongLong (createTime); buf.putLongLong (destroyTime); - buf.putLongLong (objectId); + objectId.encode(buf); } -void ManagementObject::setReference(uint64_t) {} +void ManagementObject::setReference(ObjectId) {} int ManagementObject::getThreadIndex() { static __thread int thisIndex = -1; @@ -54,3 +110,17 @@ int ManagementObject::getThreadIndex() { return thisIndex; } +Mutex& ManagementObject::getMutex() +{ + return agent->getMutex(); +} + +Buffer* ManagementObject::startEventLH() +{ + return agent->startEventLH(); +} + +void ManagementObject::finishEventLH(Buffer* buf) +{ + agent->finishEventLH(buf); +} diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 78d065aac2..1b809f5125 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -32,6 +32,34 @@ namespace management { class Manageable; class ManagementAgent; +class ObjectId; + + +class AgentAttachment { + friend class ObjectId; +private: + uint64_t first; +public: + AgentAttachment() : first(0) {} + void setBanks(uint32_t broker, uint32_t bank); +}; + + +class ObjectId { +private: + const AgentAttachment* agent; + uint64_t first; + uint64_t second; +public: + ObjectId() : agent(0), first(0), second(0) {} + ObjectId(framing::Buffer& buf) : agent(0) { decode(buf); } + ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object); + ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object); + bool operator==(const ObjectId &other) const; + bool operator<(const ObjectId &other) const; + void encode(framing::Buffer& buffer); + void decode(framing::Buffer& buffer); +}; class ManagementObject { @@ -39,7 +67,7 @@ class ManagementObject uint64_t createTime; uint64_t destroyTime; - uint64_t objectId; + ObjectId objectId; bool configChanged; bool instChanged; bool deleted; @@ -84,11 +112,15 @@ class ManagementObject int getThreadIndex(); void writeTimestamps (qpid::framing::Buffer& buf); + sys::Mutex& getMutex(); + framing::Buffer* startEventLH(); + void finishEventLH(framing::Buffer* buf); + public: typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); ManagementObject (ManagementAgent* _agent, Manageable* _core) : - destroyTime(0), objectId (0), configChanged(true), + destroyTime(0), configChanged(true), instChanged(true), deleted(false), coreObject(_core), agent(_agent) { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } virtual ~ManagementObject () {} @@ -100,14 +132,14 @@ class ManagementObject virtual void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf) = 0; - virtual void setReference (uint64_t objectId); + virtual void setReference (ObjectId objectId); virtual std::string& getClassName (void) = 0; virtual std::string& getPackageName (void) = 0; virtual uint8_t* getMd5Sum (void) = 0; - void setObjectId (uint64_t oid) { objectId = oid; } - uint64_t getObjectId (void) { return objectId; } + void setObjectId (ObjectId oid) { objectId = oid; } + ObjectId getObjectId (void) { return objectId; } inline bool getConfigChanged (void) { return configChanged; } virtual bool getInstChanged (void) { return instChanged; } inline void setAllChanged (void) { @@ -120,10 +152,9 @@ class ManagementObject deleted = true; } inline bool isDeleted (void) { return deleted; } - inline sys::Mutex& getLock() { return accessLock; } }; -typedef std::map<uint64_t,ManagementObject*> ManagementObjectMap; +typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap; }} |