diff options
Diffstat (limited to 'trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp')
-rw-r--r-- | trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp | 1295 |
1 files changed, 1295 insertions, 0 deletions
diff --git a/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp new file mode 100644 index 0000000000..b5ed4ed405 --- /dev/null +++ b/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -0,0 +1,1295 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/management/ManagementAgent.h" +#include "qpid/management/ManagementObject.h" +#include "qpid/management/IdAllocator.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/log/Statement.h" +#include <qpid/broker/Message.h> +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/sys/Time.h" +#include "qpid/broker/ConnectionState.h" +#include "qpid/broker/AclModule.h" +#include <list> +#include <iostream> +#include <fstream> + +using boost::intrusive_ptr; +using qpid::framing::Uuid; +using namespace qpid::framing; +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::sys; +using namespace std; +namespace _qmf = qmf::org::apache::qpid::broker; + +ManagementAgent::RemoteAgent::~RemoteAgent () +{ + QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); + if (mgmtObject != 0) { + mgmtObject->resourceDestroy(); + agent.deleteObjectNowLH(mgmtObject->getObjectId()); + } +} + +ManagementAgent::ManagementAgent () : + threadPoolSize(1), interval(10), broker(0), startTime(uint64_t(Duration(now()))) +{ + nextObjectId = 1; + brokerBank = 1; + bootSequence = 1; + nextRemoteBank = 10; + nextRequestSequence = 1; + clientWasAdded = false; +} + +ManagementAgent::~ManagementAgent () +{ + { + Mutex::ScopedLock lock (userLock); + + // Reset the shared pointers to exchanges. If this is not done now, the exchanges + // will stick around until dExchange and mExchange are implicitely destroyed (long + // after this destructor completes). Those exchanges hold references to management + // objects that will be invalid. + dExchange.reset(); + mExchange.reset(); + + moveNewObjectsLH(); + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) { + ManagementObject* object = iter->second; + delete object; + } + managementObjects.clear(); + } +} + +void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, + qpid::broker::Broker* _broker, int _threads) +{ + dataDir = _dataDir; + interval = _interval; + broker = _broker; + timer = &_broker->getTimer(); + threadPoolSize = _threads; + ManagementObject::maxThreads = threadPoolSize; + timer->add (new Periodic(*this, interval)); + + // Get from file or generate and save to file. + if (dataDir.empty()) + { + uuid.generate(); + QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: " + << uuid); + } + else + { + string filename(dataDir + "/.mbrokerdata"); + ifstream inFile(filename.c_str ()); + + if (inFile.good()) + { + inFile >> uuid; + inFile >> bootSequence; + inFile >> nextRemoteBank; + inFile.close(); + QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid); + + // if sequence goes beyond a 12-bit field, skip zero and wrap to 1. + bootSequence++; + if (bootSequence & 0xF000) + bootSequence = 1; + writeData(); + } + else + { + uuid.generate(); + QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid); + writeData(); + } + + QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence); + } +} + +void ManagementAgent::writeData () +{ + string filename (dataDir + "/.mbrokerdata"); + ofstream outFile (filename.c_str ()); + + if (outFile.good()) + { + outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; + outFile.close(); + } +} + +void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange, + qpid::broker::Exchange::shared_ptr _dexchange) +{ + mExchange = _mexchange; + dExchange = _dexchange; +} + +void ManagementAgent::registerClass (const string& packageName, + const string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); +} + +void ManagementAgent::registerEvent (const string& packageName, + const string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); +} + +ObjectId ManagementAgent::addObject(ManagementObject* object, + uint64_t persistId, + bool publishNow) +{ + Mutex::ScopedLock lock (addLock); + uint16_t sequence; + uint64_t objectNum; + + if (persistId == 0) { + sequence = bootSequence; + objectNum = nextObjectId++; + } else { + sequence = 0; + objectNum = persistId; + } + + ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum); + + object->setObjectId(objId); + newManagementObjects[objId] = object; + + if (publishNow) { +#define IMM_BUFSIZE 65536 + char rawBuf[IMM_BUFSIZE]; + Buffer msgBuffer(rawBuf, IMM_BUFSIZE); + + encodeHeader(msgBuffer, 'c'); + object->writeProperties(msgBuffer); + uint32_t contentSize = msgBuffer.getPosition(); + stringstream key; + key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); + msgBuffer.reset(); + sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND Immediate ContentInd to=" << key.str()); + } + + return objId; +} + +void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity) +{ + Mutex::ScopedLock lock (userLock); + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; + + encodeHeader(outBuffer, 'e'); + outBuffer.putShortString(event.getPackageName()); + outBuffer.putShortString(event.getEventName()); + outBuffer.putBin128(event.getMd5Sum()); + outBuffer.putLongLong(uint64_t(Duration(now()))); + outBuffer.putOctet(sev); + event.encode(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, mExchange, + "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); +} + +ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) + : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), agent(_agent) {} + +ManagementAgent::Periodic::~Periodic () {} + +void ManagementAgent::Periodic::fire () +{ + agent.timer->add (new Periodic (agent, agent.interval)); + agent.periodicProcessing (); +} + +void ManagementAgent::clientAdded (const std::string& routingKey) +{ + if (routingKey.find("console") != 0) + return; + + clientWasAdded = true; + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); + aIter != remoteAgents.end(); + aIter++) { + char localBuffer[16]; + Buffer outBuffer(localBuffer, 16); + uint32_t outLen; + + encodeHeader(outBuffer, 'x'); + outLen = outBuffer.getPosition(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, aIter->second->routingKey); + QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << aIter->second->routingKey); + } +} + +void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +{ + buf.putOctet ('A'); + buf.putOctet ('M'); + buf.putOctet ('2'); + buf.putOctet (opcode); + buf.putLong (seq); +} + +bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +{ + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); + + *opcode = buf.getOctet(); + *seq = buf.getLong(); + + return h1 == 'A' && h2 == 'M' && h3 == '2'; +} + +void ManagementAgent::sendBuffer(Buffer& buf, + uint32_t length, + qpid::broker::Exchange::shared_ptr exchange, + string routingKey) +{ + if (exchange.get() == 0) + return; + + intrusive_ptr<Message> msg(new Message()); + AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody())); + + content.castBody<AMQContentBody>()->decode(buf, length); + + method.setEof(false); + header.setBof(false); + header.setEof(false); + content.setBof(false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = + msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(length); + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + try { + exchange->route(deliverable, routingKey, 0); + } catch(exception&) {} +} + +void ManagementAgent::moveNewObjectsLH() +{ + Mutex::ScopedLock lock (addLock); + for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); + iter != newManagementObjects.end (); + iter++) + managementObjects[iter->first] = iter->second; + newManagementObjects.clear(); +} + +void ManagementAgent::periodicProcessing (void) +{ +#define BUFSIZE 65536 +#define HEADROOM 4096 + Mutex::ScopedLock lock (userLock); + char msgChars[BUFSIZE]; + uint32_t contentSize; + string routingKey; + list<pair<ObjectId, ManagementObject*> > deleteList; + + uint64_t uptime = uint64_t(Duration(now())) - startTime; + static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + + moveNewObjectsLH(); + + // + // Clear the been-here flag on all objects in the map. + // + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + object->setFlags(0); + if (clientWasAdded) { + object->setForcePublish(true); + } + } + + clientWasAdded = false; + + // + // Process the entire object map. + // + for (ManagementObjectMap::iterator baseIter = managementObjects.begin(); + baseIter != managementObjects.end(); + baseIter++) { + ManagementObject* baseObject = baseIter->second; + uint32_t pcount = 0; + uint32_t scount = 0; + + // + // Skip until we find a base object requiring a sent message. + // + if (baseObject->getFlags() == 1 || + (!baseObject->getConfigChanged() && + !baseObject->getInstChanged() && + !baseObject->getForcePublish() && + !baseObject->isDeleted())) + continue; + + Buffer msgBuffer(msgChars, BUFSIZE); + for (ManagementObjectMap::iterator iter = baseIter; + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (baseObject->isSameClass(*object) && object->getFlags() == 0) { + object->setFlags(1); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { + encodeHeader(msgBuffer, 'c'); + object->writeProperties(msgBuffer); + pcount++; + } + + if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { + encodeHeader(msgBuffer, 'i'); + object->writeStatistics(msgBuffer); + scount++; + } + + if (object->isDeleted()) + deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); + object->setForcePublish(false); + + if (msgBuffer.available() < HEADROOM) + break; + } + } + + contentSize = BUFSIZE - msgBuffer.available(); + if (contentSize > 0) { + msgBuffer.reset(); + stringstream key; + key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); + sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + } + } + + // Delete flagged objects + for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin(); + iter != deleteList.rend(); + iter++) { + delete iter->second; + managementObjects.erase(iter->first); + } + + if (!deleteList.empty()) { + deleteList.clear(); + deleteOrphanedAgentsLH(); + } + + { + Buffer msgBuffer(msgChars, BUFSIZE); + encodeHeader(msgBuffer, 'h'); + msgBuffer.putLongLong(uint64_t(Duration(now()))); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "console.heartbeat.1.0"; + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); + QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey); + } +} + +void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) +{ + ManagementObjectMap::iterator iter = managementObjects.find(oid); + if (iter == managementObjects.end()) + return; + ManagementObject* object = iter->second; + if (!object->isDeleted()) + return; + +#define DNOW_BUFSIZE 2048 + char msgChars[DNOW_BUFSIZE]; + uint32_t contentSize; + Buffer msgBuffer(msgChars, DNOW_BUFSIZE); + + encodeHeader(msgBuffer, 'c'); + object->writeProperties(msgBuffer); + contentSize = msgBuffer.getPosition(); + msgBuffer.reset(); + stringstream key; + key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); + sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + + managementObjects.erase(oid); +} + +void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, + uint32_t code, string text) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader (outBuffer, 'z', sequence); + outBuffer.putLong (code); + outBuffer.putShortString (text); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << + replyToKey << " seq=" << sequence); +} + +bool ManagementAgent::dispatchCommand (Deliverable& deliverable, + const string& routingKey, + const FieldTable* /*args*/) +{ + Mutex::ScopedLock lock (userLock); + Message& msg = ((DeliverableMessage&) deliverable).getMessage (); + + // Parse the routing key. This management broker should act as though it + // is bound to the exchange to match the following keys: + // + // agent.1.0.# + // broker + // schema.# + + if (routingKey == "broker") { + dispatchAgentCommandLH(msg); + return false; + } + + else if (routingKey.compare(0, 9, "agent.1.0") == 0) { + dispatchAgentCommandLH(msg); + return false; + } + + else if (routingKey.compare(0, 8, "agent.1.") == 0) { + return authorizeAgentMessageLH(msg); + } + + else if (routingKey.compare(0, 7, "schema.") == 0) { + dispatchAgentCommandLH(msg); + return true; + } + + return true; +} + +void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, + uint32_t sequence, const ConnectionToken* connToken) +{ + string methodName; + string packageName; + string className; + uint8_t hash[16]; + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + AclModule* acl = broker->getAcl(); + + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + inBuffer.getShortString(methodName); + + QPID_LOG(trace, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << + methodName << " replyTo=" << replyToKey); + + encodeHeader(outBuffer, 'm', sequence); + + DisallowedMethods::const_iterator i = disallowed.find(std::make_pair(className, methodName)); + if (i != disallowed.end()) { + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putMediumString(i->second); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence) + return; + } + + if (acl != 0) { + string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); + map<acl::Property, string> params; + params[acl::PROP_SCHEMAPACKAGE] = packageName; + params[acl::PROP_SCHEMACLASS] = className; + + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) + return; + } + } + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { + outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); + outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + } else { + if ((iter->second->getPackageName() != packageName) || + (iter->second->getClassName() != className)) { + outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); + outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); + } + else + try { + outBuffer.record(); + Mutex::ScopedUnlock u(userLock); + iter->second->doMethod(methodName, inBuffer, outBuffer); + } catch(exception& e) { + outBuffer.restore(); + outBuffer.putLong(Manageable::STATUS_EXCEPTION); + outBuffer.putMediumString(e.what()); + } + } + + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence); +} + +void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + QPID_LOG(trace, "RECV BrokerRequest replyTo=" << replyToKey); + + encodeHeader (outBuffer, 'b', sequence); + uuid.encode (outBuffer); + + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey); +} + +void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) +{ + QPID_LOG(trace, "RECV PackageQuery replyTo=" << replyToKey); + + for (PackageMap::iterator pIter = packages.begin (); + pIter != packages.end (); + pIter++) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader (outBuffer, 'p', sequence); + encodePackageIndication (outBuffer, pIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND PackageInd package=" << (*pIter).first << " to=" << replyToKey << " seq=" << sequence); + } + + sendCommandComplete (replyToKey, sequence); +} + +void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string packageName; + + inBuffer.getShortString(packageName); + + QPID_LOG(trace, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); + + findOrAddPackageLH(packageName); +} + +void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string packageName; + + inBuffer.getShortString(packageName); + + QPID_LOG(trace, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); + + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) + { + ClassMap cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin(); + cIter != cMap.end(); + cIter++) + { + if (cIter->second.hasSchema()) + { + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'q', sequence); + encodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND ClassInd class=" << (*pIter).first << ":" << (*cIter).first.name << + "(" << Uuid((*cIter).first.hash) << ") to=" << replyToKey << " seq=" << sequence); + } + } + } + sendCommandComplete(replyToKey, sequence); +} + +void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) +{ + string packageName; + SchemaClassKey key; + + uint8_t kind = inBuffer.getOctet(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); + + QPID_LOG(trace, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + "), replyTo=" << replyToKey); + + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + ClassMap::iterator cIter = pIter->second.find(key); + if (cIter == pIter->second.end() || !cIter->second.hasSchema()) { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + uint32_t sequence = nextRequestSequence++; + + encodeHeader (outBuffer, 'S', sequence); + outBuffer.putShortString(packageName); + outBuffer.putShortString(key.name); + outBuffer.putBin128(key.hash); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + "), to=" << replyToKey << " seq=" << sequence); + + if (cIter != pIter->second.end()) + pIter->second.erase(key); + + pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, sequence))); + } +} + +void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) +{ + // If the management package is attached locally (embedded in the broker or + // linked in via plug-in), call the schema handler directly. If the package + // is from a remote management agent, send the stored schema information. + + if (writeSchemaCall != 0) + writeSchemaCall(buf); + else + buf.putRawData(buffer, bufferLen); +} + +void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string packageName; + SchemaClassKey key; + + inBuffer.getShortString (packageName); + inBuffer.getShortString (key.name); + inBuffer.getBin128 (key.hash); + + QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + "), replyTo=" << replyToKey << " seq=" << sequence); + + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap& cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) { + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + SchemaClass& classInfo = cIter->second; + + if (classInfo.hasSchema()) { + encodeHeader(outBuffer, 's', sequence); + classInfo.appendSchema(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence); + } + else + sendCommandComplete(replyToKey, sequence, 1, "Schema not available"); + } + else + sendCommandComplete(replyToKey, sequence, 1, "Class key not found"); + } + else + sendCommandComplete(replyToKey, sequence, 1, "Package not found"); +} + +void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) +{ + string packageName; + SchemaClassKey key; + + inBuffer.record(); + inBuffer.getOctet(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); + inBuffer.restore(); + + QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); + + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap& cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { + size_t length = validateSchema(inBuffer, cIter->second.kind); + if (length == 0) { + QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name); + cMap.erase(key); + } else { + cIter->second.buffer = (uint8_t*) malloc(length); + cIter->second.bufferLen = length; + inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen); + + // Publish a class-indication message + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'q'); + encodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, mExchange, "schema.class"); + QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << + " to=schema.class"); + } + } + } +} + +bool ManagementAgent::bankInUse (uint32_t bank) +{ + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); + aIter != remoteAgents.end(); + aIter++) + if (aIter->second->agentBank == bank) + return true; + return false; +} + +uint32_t ManagementAgent::allocateNewBank () +{ + while (bankInUse (nextRemoteBank)) + nextRemoteBank++; + + uint32_t allocated = nextRemoteBank++; + writeData (); + return allocated; +} + +uint32_t ManagementAgent::assignBankLH (uint32_t requestedBank) +{ + if (requestedBank == 0 || bankInUse (requestedBank)) + return allocateNewBank (); + return requestedBank; +} + +void ManagementAgent::deleteOrphanedAgentsLH() +{ + vector<ObjectId> deleteList; + + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { + ObjectId connectionRef = aIter->first; + bool found = false; + + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + if (iter->first == connectionRef && !iter->second->isDeleted()) { + found = true; + break; + } + } + + if (!found) { + deleteList.push_back(connectionRef); + delete aIter->second; + } + } + + for (vector<ObjectId>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) + remoteAgents.erase(*dIter); + + deleteList.clear(); +} + +void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) +{ + string label; + uint32_t requestedBrokerBank, requestedAgentBank; + uint32_t assignedBank; + ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); + Uuid systemId; + + moveNewObjectsLH(); + deleteOrphanedAgentsLH(); + RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); + if (aIter != remoteAgents.end()) { + // There already exists an agent on this session. Reject the request. + sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent"); + return; + } + + inBuffer.getShortString(label); + systemId.decode(inBuffer); + requestedBrokerBank = inBuffer.getLong(); + requestedAgentBank = inBuffer.getLong(); + + QPID_LOG(trace, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank << + " reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence); + + assignedBank = assignBankLH(requestedAgentBank); + + RemoteAgent* agent = new RemoteAgent(*this); + agent->brokerBank = brokerBank; + agent->agentBank = assignedBank; + agent->routingKey = replyToKey; + agent->connectionRef = connectionRef; + agent->mgmtObject = new _qmf::Agent (this, agent); + agent->mgmtObject->set_connectionRef(agent->connectionRef); + agent->mgmtObject->set_label (label); + agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); + agent->mgmtObject->set_systemId (systemId); + agent->mgmtObject->set_brokerBank (brokerBank); + agent->mgmtObject->set_agentBank (assignedBank); + addObject (agent->mgmtObject, 0, true); + + remoteAgents[connectionRef] = agent; + + QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); + + // Send an Attach Response + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader (outBuffer, 'a', sequence); + outBuffer.putLong (brokerBank); + outBuffer.putLong (assignedBank); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << + " to=" << replyToKey << " seq=" << sequence); +} + +void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + moveNewObjectsLH(); + + ft.decode(inBuffer); + + QPID_LOG(trace, "RECV GetQuery query=" << ft << " seq=" << sequence); + + value = ft.get("_class"); + if (value.get() == 0 || !value->convertsTo<string>()) { + value = ft.get("_objectid"); + if (value.get() == 0 || !value->convertsTo<string>()) + return; + + ObjectId selector(value->get<string>()); + ManagementObjectMap::iterator iter = managementObjects.find(selector); + if (iter != managementObjects.end()) { + ManagementObject* object = iter->second; + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + encodeHeader(outBuffer, 'g', sequence); + object->writeProperties(outBuffer); + object->writeStatistics(outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); + } + } + sendCommandComplete(replyToKey, sequence); + return; + } + + string className (value->get<string>()); + + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (object->getClassName () == className) { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + encodeHeader(outBuffer, 'g', sequence); + object->writeProperties(outBuffer); + object->writeStatistics(outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); + } + } + } + + sendCommandComplete(replyToKey, sequence); +} + +bool ManagementAgent::authorizeAgentMessageLH(Message& msg) +{ + Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + uint32_t sequence; + string replyToKey; + + if (msg.encodedSize() > MA_BUFFER_SIZE) + return false; + + msg.encodeContent(inBuffer); + inBuffer.reset(); + + if (!checkHeader(inBuffer, &opcode, &sequence)) + return false; + + if (opcode == 'M') { + // TODO: check method call against ACL list. + AclModule* acl = broker->getAcl(); + if (acl == 0) + return true; + + string userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId(); + string packageName; + string className; + uint8_t hash[16]; + string methodName; + + map<acl::Property, string> params; + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + inBuffer.getShortString(methodName); + + params[acl::PROP_SCHEMAPACKAGE] = packageName; + params[acl::PROP_SCHEMACLASS] = className; + + if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) + return true; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); + + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'm', sequence); + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) + } + + return false; + } + + return true; +} + +void ManagementAgent::dispatchAgentCommandLH(Message& msg) +{ + Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + uint32_t sequence; + string replyToKey; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); + } + else + return; + + if (msg.encodedSize() > MA_BUFFER_SIZE) { + QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << + msg.encodedSize()); + return; + } + + msg.encodeContent(inBuffer); + uint32_t bufferLen = inBuffer.getPosition(); + inBuffer.reset(); + + while (inBuffer.getPosition() < bufferLen) { + if (!checkHeader(inBuffer, &opcode, &sequence)) + return; + + if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); + else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); + else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); + } +} + +ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string name) +{ + PackageMap::iterator pIter = packages.find (name); + if (pIter != packages.end ()) + return pIter; + + // No such package found, create a new map entry. + pair<PackageMap::iterator, bool> result = + packages.insert(pair<string, ClassMap>(name, ClassMap())); + QPID_LOG (debug, "ManagementAgent added package " << name); + + // Publish a package-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader (outBuffer, 'p'); + encodePackageIndication (outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer (outBuffer, outLen, mExchange, "schema.package"); + QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package") + + return result.first; +} + +void ManagementAgent::addClassLH(uint8_t kind, + PackageMap::iterator pIter, + const string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + SchemaClassKey key; + ClassMap& cMap = pIter->second; + + key.name = className; + memcpy(&key.hash, md5Sum, 16); + + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) + return; + + // No such class found, create a new class with local information. + QPID_LOG (debug, "ManagementAgent added class " << pIter->first << ":" << + key.name); + + cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall))); + cIter = cMap.find(key); +} + +void ManagementAgent::encodePackageIndication(Buffer& buf, + PackageMap::iterator pIter) +{ + buf.putShortString((*pIter).first); +} + +void ManagementAgent::encodeClassIndication(Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) +{ + SchemaClassKey key = (*cIter).first; + + buf.putOctet((*cIter).second.kind); + buf.putShortString((*pIter).first); + buf.putShortString(key.name); + buf.putBin128(key.hash); +} + +size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind) +{ + if (kind == ManagementItem::CLASS_KIND_TABLE) + return validateTableSchema(inBuffer); + else if (kind == ManagementItem::CLASS_KIND_EVENT) + return validateEventSchema(inBuffer); + return 0; +} + +size_t ManagementAgent::validateTableSchema(Buffer& inBuffer) +{ + uint32_t start = inBuffer.getPosition(); + uint32_t end; + string text; + uint8_t hash[16]; + + try { + inBuffer.record(); + uint8_t kind = inBuffer.getOctet(); + if (kind != ManagementItem::CLASS_KIND_TABLE) + return 0; + + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + + uint8_t superType = 0; //inBuffer.getOctet(); + + uint16_t propCount = inBuffer.getShort(); + uint16_t statCount = inBuffer.getShort(); + uint16_t methCount = inBuffer.getShort(); + + if (superType == 1) { + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + } + + for (uint16_t idx = 0; idx < propCount + statCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + } + + for (uint16_t idx = 0; idx < methCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; + int argCount = ft.getAsInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } + } + } catch (exception& /*e*/) { + return 0; + } + + end = inBuffer.getPosition(); + inBuffer.restore(); // restore original position + return end - start; +} + +size_t ManagementAgent::validateEventSchema(Buffer& inBuffer) +{ + uint32_t start = inBuffer.getPosition(); + uint32_t end; + string text; + uint8_t hash[16]; + + try { + inBuffer.record(); + uint8_t kind = inBuffer.getOctet(); + if (kind != ManagementItem::CLASS_KIND_EVENT) + return 0; + + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + + uint8_t superType = inBuffer.getOctet(); + + uint16_t argCount = inBuffer.getShort(); + + if (superType == 1) { + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + } + for (uint16_t idx = 0; idx < argCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + } + } catch (exception& /*e*/) { + return 0; + } + + end = inBuffer.getPosition(); + inBuffer.restore(); // restore original position + return end - start; +} + +void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a) +{ + Mutex::ScopedLock lock (addLock); + allocator = a; +} + +uint64_t ManagementAgent::allocateId(Manageable* object) +{ + Mutex::ScopedLock lock (addLock); + if (allocator.get()) return allocator->getIdFor(object); + return 0; +} + +void ManagementAgent::disallow(const std::string& className, const std::string& methodName, const std::string& message) { + disallowed[std::make_pair(className, methodName)] = message; +} |