diff options
Diffstat (limited to 'trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp')
-rw-r--r-- | trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp | 933 |
1 files changed, 933 insertions, 0 deletions
diff --git a/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp new file mode 100644 index 0000000000..1bdd8ab836 --- /dev/null +++ b/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp @@ -0,0 +1,933 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementBroker.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/log/Statement.h" +#include <qpid/broker/Message.h> +#include <qpid/broker/MessageDelivery.h> +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/sys/Time.h" +#include "qpid/broker/ConnectionState.h" +#include <list> +#include <iostream> +#include <fstream> + +using boost::intrusive_ptr; +using qpid::framing::Uuid; +using namespace qpid::framing; +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::sys; +using namespace std; + +Mutex ManagementAgent::Singleton::lock; +bool ManagementAgent::Singleton::disabled = false; +ManagementAgent* ManagementAgent::Singleton::agent = 0; +int ManagementAgent::Singleton::refCount = 0; + +ManagementAgent::Singleton::Singleton(bool disableManagement) +{ + Mutex::ScopedLock _lock(lock); + if (disableManagement && !disabled) { + disabled = true; + assert(refCount == 0); // can't disable after agent has been allocated + } + if (refCount == 0 && !disabled) + agent = new ManagementBroker(); + refCount++; +} + +ManagementAgent::Singleton::~Singleton() +{ + Mutex::ScopedLock _lock(lock); + refCount--; + if (refCount == 0 && !disabled) { + delete agent; + agent = 0; + } +} + +ManagementAgent* ManagementAgent::Singleton::getInstance() +{ + return agent; +} + +ManagementBroker::RemoteAgent::~RemoteAgent () +{ + if (mgmtObject != 0) + mgmtObject->resourceDestroy(); +} + +ManagementBroker::ManagementBroker () : + threadPoolSize(1), interval(10), broker(0) +{ + localBank = 5; + nextObjectId = 1; + bootSequence = 1; + nextRemoteBank = 10; + nextRequestSequence = 1; + clientWasAdded = false; +} + +ManagementBroker::~ManagementBroker () +{ + timer.stop(); + { + Mutex::ScopedLock lock (userLock); + + // Reset the shared pointers to exchanges. If this is not done now, the exchanges + // will stick around until dExchange and mExchange are implicitely destroyed (long + // after this destructor completes). Those exchanges hold references to management + // objects that will be invalid. + dExchange.reset(); + mExchange.reset(); + + moveNewObjectsLH(); + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) { + ManagementObject* object = iter->second; + delete object; + } + managementObjects.clear(); + } +} + +void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable* _broker, int _threads) +{ + dataDir = _dataDir; + interval = _interval; + broker = _broker; + threadPoolSize = _threads; + timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); + + // Get from file or generate and save to file. + if (dataDir.empty()) + { + uuid.generate(); + QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: " + << uuid); + } + else + { + string filename(dataDir + "/.mbrokerdata"); + ifstream inFile(filename.c_str ()); + + if (inFile.good()) + { + inFile >> uuid; + inFile >> bootSequence; + inFile >> nextRemoteBank; + inFile.close(); + QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); + + bootSequence++; + writeData(); + } + else + { + uuid.generate(); + QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid); + writeData(); + } + + QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence); + } +} + +void ManagementBroker::writeData () +{ + string filename (dataDir + "/.mbrokerdata"); + ofstream outFile (filename.c_str ()); + + if (outFile.good()) + { + outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; + outFile.close(); + } +} + +void ManagementBroker::setExchange (broker::Exchange::shared_ptr _mexchange, + broker::Exchange::shared_ptr _dexchange) +{ + mExchange = _mexchange; + dExchange = _dexchange; +} + +void ManagementBroker::RegisterClass (string packageName, + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = FindOrAddPackageLH(packageName); + AddClass(pIter, className, md5Sum, schemaCall); +} + +uint64_t ManagementBroker::addObject (ManagementObject* object, + uint32_t persistId, + uint32_t persistBank) +{ + Mutex::ScopedLock lock (addLock); + uint64_t objectId; + + if (persistId == 0) + { + objectId = ((uint64_t) bootSequence) << 48 | + ((uint64_t) localBank) << 24 | nextObjectId++; + if ((nextObjectId & 0xFF000000) != 0) + { + nextObjectId = 1; + localBank++; + } + } + else + objectId = ((uint64_t) persistBank) << 24 | persistId; + + object->setObjectId (objectId); + newManagementObjects[objectId] = object; + return objectId; +} + +ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) + : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {} + +ManagementBroker::Periodic::~Periodic () {} + +void ManagementBroker::Periodic::fire () +{ + broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval))); + broker.PeriodicProcessing (); +} + +void ManagementBroker::clientAdded (void) +{ + Mutex::ScopedLock lock (userLock); + + clientWasAdded = true; + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); + aIter != remoteAgents.end(); + aIter++) { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'x'); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey); + } +} + +void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +{ + buf.putOctet ('A'); + buf.putOctet ('M'); + buf.putOctet ('1'); + buf.putOctet (opcode); + buf.putLong (seq); +} + +bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +{ + uint8_t h1 = buf.getOctet (); + uint8_t h2 = buf.getOctet (); + uint8_t h3 = buf.getOctet (); + + *opcode = buf.getOctet (); + *seq = buf.getLong (); + + return h1 == 'A' && h2 == 'M' && h3 == '1'; +} + +void ManagementBroker::SendBuffer (Buffer& buf, + uint32_t length, + broker::Exchange::shared_ptr exchange, + string routingKey) +{ + if (exchange.get() == 0) + return; + + intrusive_ptr<Message> msg (new Message ()); + AMQFrame method (in_place<MessageTransferBody>( + ProtocolVersion(), exchange->getName (), 0, 0)); + AMQFrame header (in_place<AMQHeaderBody>()); + AMQFrame content(in_place<AMQContentBody>()); + + content.castBody<AMQContentBody>()->decode(buf, length); + + method.setEof (false); + header.setBof (false); + header.setEof (false); + content.setBof (false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = + msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(length); + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + exchange->route (deliverable, routingKey, 0); +} + +void ManagementBroker::moveNewObjectsLH() +{ + Mutex::ScopedLock lock (addLock); + for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); + iter != newManagementObjects.end (); + iter++) + managementObjects[iter->first] = iter->second; + newManagementObjects.clear(); +} + +void ManagementBroker::PeriodicProcessing (void) +{ +#define BUFSIZE 65536 + Mutex::ScopedLock lock (userLock); + char msgChars[BUFSIZE]; + uint32_t contentSize; + string routingKey; + std::list<uint64_t> deleteList; + + { + Buffer msgBuffer(msgChars, BUFSIZE); + EncodeHeader(msgBuffer, 'h'); + msgBuffer.putLongLong(uint64_t(Duration(now()))); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + uuid.str() + ".heartbeat"; + SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + } + + moveNewObjectsLH(); + + if (clientWasAdded) + { + clientWasAdded = false; + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject* object = iter->second; + object->setAllChanged (); + } + } + + if (managementObjects.empty ()) + return; + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject* object = iter->second; + + if (object->getConfigChanged () || object->isDeleted ()) + { + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer, 'c'); + object->writeProperties(msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + uuid.str() + ".prop." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + } + + if (object->getInstChanged ()) + { + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer, 'i'); + object->writeStatistics(msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + uuid.str () + ".stat." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + } + + if (object->isDeleted ()) + deleteList.push_back (iter->first); + } + + // Delete flagged objects + for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); + iter != deleteList.rend (); + iter++) + managementObjects.erase (*iter); + + if (!deleteList.empty()) { + deleteList.clear(); + deleteOrphanedAgentsLH(); + } +} + +void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence, + uint32_t code, string text) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'z', sequence); + outBuffer.putLong (code); + outBuffer.putShortString (text); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +bool ManagementBroker::dispatchCommand (Deliverable& deliverable, + const string& routingKey, + const FieldTable* /*args*/) +{ + Mutex::ScopedLock lock (userLock); + Message& msg = ((DeliverableMessage&) deliverable).getMessage (); + + // Parse the routing key. This management broker should act as though it + // is bound to the exchange to match the following keys: + // + // agent.<X>.# + // broker.# + // + // where <X> is any non-negative decimal integer less than the lowest remote + // object-id bank. + + if (routingKey == "broker") { + dispatchAgentCommandLH (msg); + return false; + } + + else if (routingKey.compare(0, 6, "agent.") == 0) { + std::string::size_type delim = routingKey.find('.', 6); + if (delim == string::npos) + delim = routingKey.length(); + string bank = routingKey.substr(6, delim - 6); + if ((uint32_t) atoi(bank.c_str()) <= localBank) { + dispatchAgentCommandLH (msg); + return false; + } + } + + return true; +} + +void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string methodName; + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + uint64_t objId = inBuffer.getLongLong(); + inBuffer.getShortString(methodName); + + EncodeHeader(outBuffer, 'm', sequence); + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { + outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + } else { + iter->second->doMethod(methodName, inBuffer, outBuffer); + } + + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'b', sequence); + uuid.encode (outBuffer); + + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) +{ + for (PackageMap::iterator pIter = packages.begin (); + pIter != packages.end (); + pIter++) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'p', sequence); + EncodePackageIndication (outBuffer, pIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + + sendCommandComplete (replyToKey, sequence); +} + +void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) +{ + std::string packageName; + + inBuffer.getShortString(packageName); + FindOrAddPackageLH(packageName); +} + +void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + std::string packageName; + + inBuffer.getShortString (packageName); + PackageMap::iterator pIter = packages.find (packageName); + if (pIter != packages.end ()) + { + ClassMap cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin (); + cIter != cMap.end (); + cIter++) + { + if (cIter->second->hasSchema ()) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'q', sequence); + EncodeClassIndication (outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + } + } + + sendCommandComplete (replyToKey, sequence); +} + +void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) +{ + std::string packageName; + SchemaClassKey key; + + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); + + PackageMap::iterator pIter = FindOrAddPackageLH(packageName); + ClassMap::iterator cIter = pIter->second.find(key); + if (cIter == pIter->second.end()) { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + uint32_t sequence = nextRequestSequence++; + + EncodeHeader (outBuffer, 'S', sequence); + outBuffer.putShortString(packageName); + outBuffer.putShortString(key.name); + outBuffer.putBin128(key.hash); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + + SchemaClass* newSchema = new SchemaClass; + newSchema->pendingSequence = sequence; + pIter->second[key] = newSchema; + } +} + +void ManagementBroker::SchemaClass::appendSchema(Buffer& buf) +{ + // If the management package is attached locally (embedded in the broker or + // linked in via plug-in), call the schema handler directly. If the package + // is from a remote management agent, send the stored schema information. + + if (writeSchemaCall != 0) + writeSchemaCall(buf); + else + buf.putRawData(buffer, bufferLen); +} + +void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string packageName; + SchemaClassKey key; + + inBuffer.getShortString (packageName); + inBuffer.getShortString (key.name); + inBuffer.getBin128 (key.hash); + + PackageMap::iterator pIter = packages.find (packageName); + if (pIter != packages.end()) { + ClassMap cMap = pIter->second; + ClassMap::iterator cIter = cMap.find (key); + if (cIter != cMap.end()) { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + SchemaClass* classInfo = cIter->second; + + if (classInfo->hasSchema()) { + EncodeHeader(outBuffer, 's', sequence); + classInfo->appendSchema (outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset(); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + else + sendCommandComplete (replyToKey, sequence, 1, "Schema not available"); + } + else + sendCommandComplete (replyToKey, sequence, 1, "Class key not found"); + } + else + sendCommandComplete (replyToKey, sequence, 1, "Package not found"); +} + +void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) +{ + string packageName; + SchemaClassKey key; + + inBuffer.record(); + inBuffer.getShortString (packageName); + inBuffer.getShortString (key.name); + inBuffer.getBin128 (key.hash); + inBuffer.restore(); + + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) { + size_t length = ValidateSchema(inBuffer); + if (length == 0) + cMap.erase(key); + else { + cIter->second->buffer = (uint8_t*) malloc(length); + cIter->second->bufferLen = length; + inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen); + + // Publish a class-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'q'); + EncodeClassIndication (outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); + } + } + } +} + +bool ManagementBroker::bankInUse (uint32_t bank) +{ + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); + aIter != remoteAgents.end(); + aIter++) + if (aIter->second->objIdBank == bank) + return true; + return false; +} + +uint32_t ManagementBroker::allocateNewBank () +{ + while (bankInUse (nextRemoteBank)) + nextRemoteBank++; + + uint32_t allocated = nextRemoteBank++; + writeData (); + return allocated; +} + +uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank) +{ + if (requestedBank == 0 || bankInUse (requestedBank)) + return allocateNewBank (); + return requestedBank; +} + +void ManagementBroker::deleteOrphanedAgentsLH() +{ + vector<uint64_t> deleteList; + + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { + uint64_t connectionRef = aIter->first; + bool found = false; + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) { + if (iter->first == connectionRef && !iter->second->isDeleted()) { + found = true; + break; + } + } + + if (!found) { + deleteList.push_back(connectionRef); + delete aIter->second; + } + } + + for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) { + + remoteAgents.erase(*dIter); + } + + deleteList.clear(); +} + +void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) +{ + string label; + uint32_t requestedBank; + uint32_t assignedBank; + uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); + Uuid systemId; + + moveNewObjectsLH(); + deleteOrphanedAgentsLH(); + RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); + if (aIter != remoteAgents.end()) { + // There already exists an agent on this session. Reject the request. + sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent"); + return; + } + + inBuffer.getShortString (label); + systemId.decode (inBuffer); + requestedBank = inBuffer.getLong (); + assignedBank = assignBankLH (requestedBank); + + RemoteAgent* agent = new RemoteAgent; + agent->objIdBank = assignedBank; + agent->routingKey = replyToKey; + agent->connectionRef = connectionRef; + agent->mgmtObject = new management::Agent (this, agent); + agent->mgmtObject->set_connectionRef(agent->connectionRef); + agent->mgmtObject->set_label (label); + agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); + agent->mgmtObject->set_systemId (systemId); + agent->mgmtObject->set_objectIdBank (assignedBank); + addObject (agent->mgmtObject); + + remoteAgents[connectionRef] = agent; + + // Send an Attach Response + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'a', sequence); + outBuffer.putLong (assignedBank); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + moveNewObjectsLH(); + + ft.decode(inBuffer); + value = ft.get("_class"); + if (value.get() == 0 || !value->convertsTo<string>()) + { + // TODO: Send completion with an error code + return; + } + + string className (value->get<string>()); + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject* object = iter->second; + if (object->getClassName () == className) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'g', sequence); + object->writeProperties(outBuffer); + object->writeStatistics(outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + } + + sendCommandComplete (replyToKey, sequence); +} + +void ManagementBroker::dispatchAgentCommandLH (Message& msg) +{ + Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + uint32_t sequence; + string replyToKey; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); + } + else + return; + + if (msg.encodedSize() > MA_BUFFER_SIZE) { + QPID_LOG(debug, "ManagementBroker::dispatchAgentCommandLH: Message too large: " << + msg.encodedSize()); + return; + } + + msg.encodeContent(inBuffer); + inBuffer.reset(); + + if (!CheckHeader(inBuffer, &opcode, &sequence)) + return; + + if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); + else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); + else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence); +} + +ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) +{ + PackageMap::iterator pIter = packages.find (name); + if (pIter != packages.end ()) + return pIter; + + // No such package found, create a new map entry. + pair<PackageMap::iterator, bool> result = + packages.insert (pair<string, ClassMap> (name, ClassMap ())); + QPID_LOG (debug, "ManagementBroker added package " << name); + + // Publish a package-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'p'); + EncodePackageIndication (outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); + + return result.first; +} + +void ManagementBroker::AddClass(PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + SchemaClassKey key; + ClassMap& cMap = pIter->second; + + key.name = className; + memcpy (&key.hash, md5Sum, 16); + + ClassMap::iterator cIter = cMap.find (key); + if (cIter != cMap.end ()) + return; + + // No such class found, create a new class with local information. + QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << + key.name); + SchemaClass* classInfo = new SchemaClass; + + classInfo->writeSchemaCall = schemaCall; + cMap[key] = classInfo; + cIter = cMap.find (key); +} + +void ManagementBroker::EncodePackageIndication (Buffer& buf, + PackageMap::iterator pIter) +{ + buf.putShortString ((*pIter).first); +} + +void ManagementBroker::EncodeClassIndication (Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) +{ + SchemaClassKey key = (*cIter).first; + + buf.putShortString ((*pIter).first); + buf.putShortString (key.name); + buf.putBin128 (key.hash); +} + +size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) +{ + uint32_t start = inBuffer.getPosition(); + uint32_t end; + string text; + uint8_t hash[16]; + + inBuffer.record(); + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + + uint16_t propCount = inBuffer.getShort(); + uint16_t statCount = inBuffer.getShort(); + uint16_t methCount = inBuffer.getShort(); + uint16_t evntCount = inBuffer.getShort(); + + for (uint16_t idx = 0; idx < propCount + statCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + } + + for (uint16_t idx = 0; idx < methCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + int argCount = ft.getInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } + } + + if (evntCount != 0) + return 0; + + end = inBuffer.getPosition(); + inBuffer.restore(); // restore original position + return end - start; +} |