diff options
Diffstat (limited to 'cpp/src/qpid/agent/ManagementAgentImpl.cpp')
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 881 |
1 files changed, 650 insertions, 231 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index ebdc71e3b1..f84e158154 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -20,16 +20,25 @@ #include "qpid/management/Manageable.h" #include "qpid/management/ManagementObject.h" -#include "ManagementAgentImpl.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/PipeHandle.h" +#include "qpid/agent/ManagementAgentImpl.h" #include <list> -#include <unistd.h> #include <string.h> +#include <stdlib.h> +#include <sys/types.h> +#include <iostream> +#include <fstream> + using namespace qpid::client; using namespace qpid::framing; using namespace qpid::management; using namespace qpid::sys; +using namespace std; using std::stringstream; +using std::ofstream; +using std::ifstream; using std::string; using std::cout; using std::endl; @@ -66,128 +75,274 @@ ManagementAgent* ManagementAgent::Singleton::getInstance() return agent; } +const string ManagementAgentImpl::storeMagicNumber("MA02"); + ManagementAgentImpl::ManagementAgentImpl() : - clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false) + interval(10), extThread(false), pipeHandle(0), + initialized(false), connected(false), lastFailure("never connected"), + clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), + assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), + connThreadBody(*this), connThread(connThreadBody), + pubThreadBody(*this), pubThread(pubThreadBody) { - // TODO: Establish system ID } -void ManagementAgentImpl::init(std::string brokerHost, - uint16_t brokerPort, - uint16_t intervalSeconds, - bool useExternalThread) +ManagementAgentImpl::~ManagementAgentImpl() { + // shutdown & cleanup all threads + connThreadBody.close(); + pubThreadBody.close(); + + connThread.join(); + pubThread.join(); + + // Release the memory associated with stored management objects. { Mutex::ScopedLock lock(agentLock); - startupWait = true; + + moveNewObjectsLH(); + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) { + ManagementObject* object = iter->second; + delete object; + } + managementObjects.clear(); + } + if (pipeHandle) { + delete pipeHandle; + pipeHandle = 0; } +} +void ManagementAgentImpl::init(const string& brokerHost, + uint16_t brokerPort, + uint16_t intervalSeconds, + bool useExternalThread, + const string& _storeFile, + const string& uid, + const string& pwd, + const string& mech, + const string& proto) +{ + client::ConnectionSettings settings; + settings.protocol = proto; + settings.host = brokerHost; + settings.port = brokerPort; + settings.username = uid; + settings.password = pwd; + settings.mechanism = mech; + init(settings, intervalSeconds, useExternalThread, _storeFile); +} + +void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings, + uint16_t intervalSeconds, + bool useExternalThread, + const std::string& _storeFile) +{ interval = intervalSeconds; extThread = useExternalThread; + storeFile = _storeFile; nextObjectId = 1; - sessionId.generate(); - queueName << "qmfagent-" << sessionId; - string dest = "qmfagent"; + QPID_LOG(info, "QMF Agent Initialized: broker=" << settings.host << ":" << settings.port << + " interval=" << intervalSeconds << " storeFile=" << _storeFile); + connectionSettings = settings; - connection.open(brokerHost.c_str(), brokerPort); - session = connection.newSession (queueName.str()); - dispatcher = new client::Dispatcher(session); + // TODO: Abstract the socket calls for portability + // qpid::sys::PipeHandle to create a pipe + if (extThread) { + pipeHandle = new PipeHandle(true); + } + retrieveData(); + bootSequence++; + if ((bootSequence & 0xF000) != 0) + bootSequence = 1; + storeData(true); - session.queueDeclare (arg::queue=queueName.str()); - session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(), - arg::bindingKey=queueName.str()); - session.messageSubscribe (arg::queue=queueName.str(), - arg::destination=dest); - session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF); - session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF); + initialized = true; +} - Message attachRequest; - char rawbuffer[512]; - Buffer buffer (rawbuffer, 512); +void ManagementAgentImpl::registerClass(const string& packageName, + const string& className, + uint8_t* md5Sum, + qpid::management::ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = findOrAddPackage(packageName); + addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); +} - attachRequest.getDeliveryProperties().setRoutingKey("broker"); - attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); +void ManagementAgentImpl::registerEvent(const string& packageName, + const string& eventName, + uint8_t* md5Sum, + qpid::management::ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = findOrAddPackage(packageName); + addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); +} - EncodeHeader (buffer, 'A'); - buffer.putShortString ("RemoteAgent [C++]"); - systemId.encode (buffer); - buffer.putLong (11); +ObjectId ManagementAgentImpl::addObject(ManagementObject* object, + uint64_t persistId) +{ + Mutex::ScopedLock lock(addLock); + uint16_t sequence = persistId ? 0 : bootSequence; + uint64_t objectNum = persistId ? persistId : nextObjectId++; - size_t length = 512 - buffer.available (); - string stringBuffer (rawbuffer, length); - attachRequest.setData (stringBuffer); + ObjectId objectId(&attachment, 0, sequence, objectNum); - session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management"); + // TODO: fix object-id handling + object->setObjectId(objectId); + newManagementObjects[objectId] = object; + return objectId; +} - dispatcher->listen(dest, this); - dispatcher->start(); +void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity) +{ + Mutex::ScopedLock lock(agentLock); + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; + stringstream key; - { - Mutex::ScopedLock lock(agentLock); - if (startupWait) - startupCond.wait(agentLock); - } + key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." << + event.getPackageName() << "." << event.getEventName(); + + encodeHeader(outBuffer, 'e'); + outBuffer.putShortString(event.getPackageName()); + outBuffer.putShortString(event.getEventName()); + outBuffer.putBin128(event.getMd5Sum()); + outBuffer.putLongLong(uint64_t(Duration(now()))); + outBuffer.putOctet(sev); + event.encode(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", key.str()); } -ManagementAgentImpl::~ManagementAgentImpl() +uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) { - dispatcher->stop(); - session.close(); - delete dispatcher; + Mutex::ScopedLock lock(agentLock); + + for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) { + if (methodQueue.empty()) + break; + + QueuedMethod* item = methodQueue.front(); + methodQueue.pop_front(); + { + Mutex::ScopedUnlock unlock(agentLock); + Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size()); + invokeMethodRequest(inBuffer, item->sequence, item->replyTo); + delete item; + } + } + + char rbuf[100]; + while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes + return methodQueue.size(); } -void ManagementAgentImpl::RegisterClass (std::string packageName, - std::string className, - uint8_t* md5Sum, - management::ManagementObject::writeSchemaCall_t schemaCall) -{ - Mutex::ScopedLock lock(agentLock); - PackageMap::iterator pIter = FindOrAddPackage (packageName); - AddClassLocal (pIter, className, md5Sum, schemaCall); +int ManagementAgentImpl::getSignalFd(void) +{ + return pipeHandle->getReadHandle(); } -uint64_t ManagementAgentImpl::addObject (ManagementObject* object, - uint32_t /*persistId*/, - uint32_t /*persistBank*/) +void ManagementAgentImpl::startProtocol() { - Mutex::ScopedLock lock(addLock); - uint64_t objectId; + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + + connected = true; + encodeHeader(buffer, 'A'); + buffer.putShortString("RemoteAgent [C++]"); + systemId.encode (buffer); + buffer.putLong(requestedBrokerBank); + buffer.putLong(requestedAgentBank); + uint32_t length = buffer.getPosition(); + buffer.reset(); + connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker"); + QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank << + " reqAgent=" << requestedAgentBank); +} - // TODO: fix object-id handling - objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF); - object->setObjectId (objectId); - newManagementObjects[objectId] = object; - return objectId; +void ManagementAgentImpl::storeData(bool requested) +{ + if (!storeFile.empty()) { + ofstream outFile(storeFile.c_str()); + uint32_t brokerBankToWrite = requested ? requestedBrokerBank : assignedBrokerBank; + uint32_t agentBankToWrite = requested ? requestedAgentBank : assignedAgentBank; + + if (outFile.good()) { + outFile << storeMagicNumber << " " << brokerBankToWrite << " " << + agentBankToWrite << " " << bootSequence << endl; + outFile.close(); + } + } } -uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/) +void ManagementAgentImpl::retrieveData() { - return 0; + if (!storeFile.empty()) { + ifstream inFile(storeFile.c_str()); + string mn; + + if (inFile.good()) { + inFile >> mn; + if (mn == storeMagicNumber) { + inFile >> requestedBrokerBank; + inFile >> requestedAgentBank; + inFile >> bootSequence; + } + inFile.close(); + } + } } -int ManagementAgentImpl::getSignalFd(void) +void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence, + uint32_t code, string text) { - return -1; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'z', sequence); + outBuffer.putLong(code); + outBuffer.putShortString(text); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey); + QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); } void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) { Mutex::ScopedLock lock(agentLock); - uint32_t assigned; - stringstream key; - assigned = inBuffer.getLong(); - objIdPrefix = ((uint64_t) assigned) << 24; + assignedBrokerBank = inBuffer.getLong(); + assignedAgentBank = inBuffer.getLong(); - startupWait = false; - startupCond.notify(); + QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank); + + if ((assignedBrokerBank != requestedBrokerBank) || + (assignedAgentBank != requestedAgentBank)) { + if (requestedAgentBank == 0) { + QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." << + assignedAgentBank); + } else { + QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank << + "." << assignedAgentBank); + } + storeData(); + requestedBrokerBank = assignedBrokerBank; + requestedAgentBank = assignedAgentBank; + } + + attachment.setBanks(assignedBrokerBank, assignedAgentBank); // Bind to qpid.management to receive commands - key << "agent." << assigned; - session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(), - arg::bindingKey=key.str()); + connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank); // Send package indications for all local packages for (PackageMap::iterator pIter = packages.begin(); @@ -196,21 +351,21 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'p'); - EncodePackageIndication(outBuffer, pIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + encodeHeader(outBuffer, 'p'); + encodePackageIndication(outBuffer, pIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); // Send class indications for all local classes ClassMap cMap = pIter->second; for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) { outBuffer.reset(); - EncodeHeader(outBuffer, 'q'); - EncodeClassIndication(outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + encodeHeader(outBuffer, 'q'); + encodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -225,20 +380,24 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc inBuffer.getShortString(key.name); inBuffer.getBin128(key.hash); + QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name); + PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { - ClassMap cMap = pIter->second; + ClassMap& cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end()) { - SchemaClass schema = cIter->second; - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader(outBuffer, 's', sequence); - schema.writeSchemaCall(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "broker"); + SchemaClass& schema = cIter->second; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 's', sequence); + schema.writeSchemaCall(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); + + QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name); } } } @@ -247,30 +406,134 @@ void ManagementAgentImpl::handleConsoleAddedIndication() { Mutex::ScopedLock lock(agentLock); clientWasAdded = true; + + QPID_LOG(trace, "RCVD ConsoleAddedInd"); } -void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) { string methodName; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + string packageName; + string className; + uint8_t hash[16]; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - uint64_t objId = inBuffer.getLongLong(); + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); inBuffer.getShortString(methodName); - EncodeHeader(outBuffer, 'm', sequence); + encodeHeader(outBuffer, 'm', sequence); ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); - outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT)); } else { - iter->second->doMethod(methodName, inBuffer, outBuffer); + if ((iter->second->getPackageName() != packageName) || + (iter->second->getClassName() != className)) { + outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); + outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); + } + else + try { + outBuffer.record(); + iter->second->doMethod(methodName, inBuffer, outBuffer); + } catch(exception& e) { + outBuffer.restore(); + outBuffer.putLong(Manageable::STATUS_EXCEPTION); + outBuffer.putMediumString(e.what()); + } } outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "amq.direct", replyTo); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); +} + +void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + moveNewObjectsLH(); + + ft.decode(inBuffer); + + QPID_LOG(trace, "RCVD GetQuery: map=" << ft); + + value = ft.get("_class"); + if (value.get() == 0 || !value->convertsTo<string>()) { + value = ft.get("_objectid"); + if (value.get() == 0 || !value->convertsTo<string>()) + return; + + ObjectId selector(value->get<string>()); + ManagementObjectMap::iterator iter = managementObjects.find(selector); + if (iter != managementObjects.end()) { + ManagementObject* object = iter->second; + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + encodeHeader(outBuffer, 'g', sequence); + object->writeProperties(outBuffer); + object->writeStatistics(outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + + QPID_LOG(trace, "SENT ObjectInd"); + } + sendCommandComplete(replyTo, sequence); + return; + } + + string className(value->get<string>()); + + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (object->getClassName() == className) { + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + encodeHeader(outBuffer, 'g', sequence); + object->writeProperties(outBuffer); + object->writeStatistics(outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + + QPID_LOG(trace, "SENT ObjectInd"); + } + } + + sendCommandComplete(replyTo, sequence); +} + +void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +{ + if (extThread) { + Mutex::ScopedLock lock(agentLock); + string body; + + inBuffer.getRawData(body, inBuffer.available()); + methodQueue.push_back(new QueuedMethod(sequence, replyTo, body)); + pipeHandle->write("X", 1); + } else { + invokeMethodRequest(inBuffer, sequence, replyTo); + } + + QPID_LOG(trace, "RCVD MethodRequest"); } void ManagementAgentImpl::received(Message& msg) @@ -287,215 +550,371 @@ void ManagementAgentImpl::received(Message& msg) replyToKey = rt.getRoutingKey(); } - if (CheckHeader (inBuffer, &opcode, &sequence)) + if (checkHeader(inBuffer, &opcode, &sequence)) { if (opcode == 'a') handleAttachResponse(inBuffer); else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); else if (opcode == 'x') handleConsoleAddedIndication(); + else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey); else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey); } } -void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) { - buf.putOctet ('A'); - buf.putOctet ('M'); - buf.putOctet ('1'); - buf.putOctet (opcode); - buf.putLong (seq); + buf.putOctet('A'); + buf.putOctet('M'); + buf.putOctet('2'); + buf.putOctet(opcode); + buf.putLong (seq); } -bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) { if (buf.getSize() < 8) return false; - uint8_t h1 = buf.getOctet (); - uint8_t h2 = buf.getOctet (); - uint8_t h3 = buf.getOctet (); + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); - *opcode = buf.getOctet (); - *seq = buf.getLong (); + *opcode = buf.getOctet(); + *seq = buf.getLong(); - return h1 == 'A' && h2 == 'M' && h3 == '1'; + return h1 == 'A' && h2 == 'M' && h3 == '2'; } -void ManagementAgentImpl::SendBuffer (Buffer& buf, - uint32_t length, - string exchange, - string routingKey) +ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage(const string& name) { - Message msg; - string data; - - if (objIdPrefix == 0) - return; - - buf.getRawData(data, length); - msg.getDeliveryProperties().setRoutingKey(routingKey); - msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); - msg.setData (data); - session.messageTransfer (arg::content=msg, arg::destination=exchange); -} - -ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name) -{ - PackageMap::iterator pIter = packages.find (name); - if (pIter != packages.end ()) + PackageMap::iterator pIter = packages.find(name); + if (pIter != packages.end()) return pIter; // No such package found, create a new map entry. - std::pair<PackageMap::iterator, bool> result = - packages.insert (std::pair<string, ClassMap> (name, ClassMap ())); + pair<PackageMap::iterator, bool> result = + packages.insert(pair<string, ClassMap>(name, ClassMap())); - // Publish a package-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + if (connected) { + // Publish a package-indication message + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; - EncodeHeader (outBuffer, 'p'); - EncodePackageIndication (outBuffer, result.first); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package"); + encodeHeader(outBuffer, 'p'); + encodePackageIndication(outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "schema.package"); + } return result.first; } void ManagementAgentImpl::moveNewObjectsLH() { - Mutex::ScopedLock lock (addLock); - for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); - iter != newManagementObjects.end (); + Mutex::ScopedLock lock(addLock); + for (ManagementObjectMap::iterator iter = newManagementObjects.begin(); + iter != newManagementObjects.end(); iter++) managementObjects[iter->first] = iter->second; newManagementObjects.clear(); } -void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter, - string className, - uint8_t* md5Sum, - management::ManagementObject::writeSchemaCall_t schemaCall) +void ManagementAgentImpl::addClassLocal(uint8_t classKind, + PackageMap::iterator pIter, + const string& className, + uint8_t* md5Sum, + qpid::management::ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; key.name = className; - memcpy (&key.hash, md5Sum, 16); + memcpy(&key.hash, md5Sum, 16); - ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) return; // No such class found, create a new class with local information. - SchemaClass classInfo; - - classInfo.writeSchemaCall = schemaCall; - cMap[key] = classInfo; - - // TODO: Publish a class-indication message + cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind))); } -void ManagementAgentImpl::EncodePackageIndication (Buffer& buf, - PackageMap::iterator pIter) +void ManagementAgentImpl::encodePackageIndication(Buffer& buf, + PackageMap::iterator pIter) { - buf.putShortString ((*pIter).first); + buf.putShortString((*pIter).first); + + QPID_LOG(trace, "SENT PackageInd: package=" << (*pIter).first); } -void ManagementAgentImpl::EncodeClassIndication (Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter) +void ManagementAgentImpl::encodeClassIndication(Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) { SchemaClassKey key = (*cIter).first; - buf.putShortString ((*pIter).first); - buf.putShortString (key.name); - buf.putBin128 (key.hash); + buf.putOctet((*cIter).second.kind); + buf.putShortString((*pIter).first); + buf.putShortString(key.name); + buf.putBin128(key.hash); + + QPID_LOG(trace, "SENT ClassInd: package=" << (*pIter).first << " class=" << key.name); } -void ManagementAgentImpl::PeriodicProcessing() +void ManagementAgentImpl::periodicProcessing() { #define BUFSIZE 65536 Mutex::ScopedLock lock(agentLock); char msgChars[BUFSIZE]; uint32_t contentSize; - string routingKey; - std::list<uint64_t> deleteList; + list<pair<ObjectId, ManagementObject*> > deleteList; - { - Buffer msgBuffer(msgChars, BUFSIZE); - EncodeHeader(msgBuffer, 'h'); - msgBuffer.putLongLong(uint64_t(Duration(now()))); + if (!connected) + return; - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt." + systemId.str() + ".heartbeat"; - SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + moveNewObjectsLH(); + + // + // Clear the been-here flag on all objects in the map. + // + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + object->setFlags(0); + if (clientWasAdded) { + object->setForcePublish(true); + } } - moveNewObjectsLH(); + clientWasAdded = false; + + // + // Process the entire object map. + // + for (ManagementObjectMap::iterator baseIter = managementObjects.begin(); + baseIter != managementObjects.end(); + baseIter++) { + ManagementObject* baseObject = baseIter->second; + + // + // Skip until we find a base object requiring a sent message. + // + if (baseObject->getFlags() == 1 || + (!baseObject->getConfigChanged() && + !baseObject->getInstChanged() && + !baseObject->getForcePublish() && + !baseObject->isDeleted())) + continue; - if (clientWasAdded) - { - clientWasAdded = false; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) - { + Buffer msgBuffer(msgChars, BUFSIZE); + for (ManagementObjectMap::iterator iter = baseIter; + iter != managementObjects.end(); + iter++) { ManagementObject* object = iter->second; - object->setAllChanged (); + if (baseObject->isSameClass(*object) && object->getFlags() == 0) { + object->setFlags(1); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { + encodeHeader(msgBuffer, 'c'); + object->writeProperties(msgBuffer); + } + + if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { + encodeHeader(msgBuffer, 'i'); + object->writeStatistics(msgBuffer); + } + + if (object->isDeleted()) + deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); + object->setForcePublish(false); + + if (msgBuffer.available() < (BUFSIZE / 2)) + break; + } + } + + contentSize = BUFSIZE - msgBuffer.available(); + if (contentSize > 0) { + msgBuffer.reset(); + stringstream key; + key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." << + baseObject->getPackageName() << "." << baseObject->getClassName(); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); } } - if (managementObjects.empty ()) - return; - - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) + // Delete flagged objects + for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin(); + iter != deleteList.rend(); + iter++) { + delete iter->second; + managementObjects.erase(iter->first); + } + + deleteList.clear(); + { - ManagementObject* object = iter->second; + Buffer msgBuffer(msgChars, BUFSIZE); + encodeHeader(msgBuffer, 'h'); + msgBuffer.putLongLong(uint64_t(Duration(now()))); + stringstream key; + key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; - if (object->getConfigChanged () || object->isDeleted ()) - { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'c'); - object->writeProperties(msgBuffer); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + contentSize = BUFSIZE - msgBuffer.available(); + msgBuffer.reset(); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); + } +} + +void ManagementAgentImpl::ConnectionThread::run() +{ + static const int delayMin(1); + static const int delayMax(128); + static const int delayFactor(2); + int delay(delayMin); + string dest("qmfagent"); + ConnectionThread::shared_ptr tmp; + + sessionId.generate(); + queueName << "qmfagent-" << sessionId; + + while (true) { + try { + if (agent.initialized) { + QPID_LOG(debug, "QMF Agent attempting to connect to the broker..."); + connection.open(agent.connectionSettings); + session = connection.newSession(queueName.str()); + subscriptions.reset(new client::SubscriptionManager(session)); + + session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true, + arg::exclusive=true); + session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), + arg::bindingKey=queueName.str()); + + subscriptions->subscribe(agent, queueName.str(), dest); + QPID_LOG(info, "Connection established with broker"); + { + Mutex::ScopedLock _lock(connLock); + if (shutdown) + return; + operational = true; + agent.startProtocol(); + try { + Mutex::ScopedUnlock _unlock(connLock); + subscriptions->run(); + } catch (exception) {} + + QPID_LOG(warning, "Connection to the broker has been lost"); + + operational = false; + agent.connected = false; + tmp = subscriptions; + subscriptions.reset(); + } + tmp.reset(); // frees the subscription outside the lock + delay = delayMin; + connection.close(); + } + } catch (exception &e) { + if (delay < delayMax) + delay *= delayFactor; + QPID_LOG(debug, "Connection failed: exception=" << e.what()); } - - if (object->getInstChanged ()) + { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'i'); - object->writeStatistics(msgBuffer); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + // sleep for "delay" seconds, but peridically check if the + // agent is shutting down so we don't hang for up to delayMax + // seconds during agent shutdown + Mutex::ScopedLock _lock(connLock); + if (shutdown) + return; + sleeping = true; + int totalSleep = 0; + do { + Mutex::ScopedUnlock _unlock(connLock); + ::sleep(delayMin); + totalSleep += delayMin; + } while (totalSleep < delay && !shutdown); + sleeping = false; + if (shutdown) + return; } + } +} + +ManagementAgentImpl::ConnectionThread::~ConnectionThread() +{ +} - if (object->isDeleted ()) - deleteList.push_back (iter->first); +void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, + uint32_t length, + const string& exchange, + const string& routingKey) +{ + ConnectionThread::shared_ptr s; + { + Mutex::ScopedLock _lock(connLock); + if (!operational) + return; + s = subscriptions; } - // Delete flagged objects - for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); - iter != deleteList.rend (); - iter++) - managementObjects.erase (*iter); + Message msg; + string data; - deleteList.clear (); + buf.getRawData(data, length); + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); + msg.setData(data); + try { + session.messageTransfer(arg::content=msg, arg::destination=exchange); + } catch(exception& e) { + QPID_LOG(error, "Exception caught in sendBuffer: " << e.what()); + // Bounce the connection + if (s) + s->stop(); + } } -void ManagementAgentImpl::BackgroundThread::run() +void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank) { - while (true) { - ::sleep(5); - agent.PeriodicProcessing(); + stringstream key; + key << "agent." << brokerBank << "." << agentBank; + session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(), + arg::bindingKey=key.str()); +} + +void ManagementAgentImpl::ConnectionThread::close() +{ + ConnectionThread::shared_ptr s; + { + Mutex::ScopedLock _lock(connLock); + shutdown = true; + s = subscriptions; + } + if (s) + s->stop(); +} + +bool ManagementAgentImpl::ConnectionThread::isSleeping() const +{ + Mutex::ScopedLock _lock(connLock); + return sleeping; +} + + +void ManagementAgentImpl::PublishThread::run() +{ + uint16_t totalSleep; + + while (!shutdown) { + agent.periodicProcessing(); + totalSleep = 0; + while (totalSleep++ < agent.getInterval() && !shutdown) { + ::sleep(1); + } } } |