summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/agent/ManagementAgentImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/agent/ManagementAgentImpl.cpp')
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp881
1 files changed, 650 insertions, 231 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index ebdc71e3b1..f84e158154 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -20,16 +20,25 @@
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementObject.h"
-#include "ManagementAgentImpl.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/PipeHandle.h"
+#include "qpid/agent/ManagementAgentImpl.h"
#include <list>
-#include <unistd.h>
#include <string.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <iostream>
+#include <fstream>
+
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::management;
using namespace qpid::sys;
+using namespace std;
using std::stringstream;
+using std::ofstream;
+using std::ifstream;
using std::string;
using std::cout;
using std::endl;
@@ -66,128 +75,274 @@ ManagementAgent* ManagementAgent::Singleton::getInstance()
return agent;
}
+const string ManagementAgentImpl::storeMagicNumber("MA02");
+
ManagementAgentImpl::ManagementAgentImpl() :
- clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false)
+ interval(10), extThread(false), pipeHandle(0),
+ initialized(false), connected(false), lastFailure("never connected"),
+ clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
+ assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
+ connThreadBody(*this), connThread(connThreadBody),
+ pubThreadBody(*this), pubThread(pubThreadBody)
{
- // TODO: Establish system ID
}
-void ManagementAgentImpl::init(std::string brokerHost,
- uint16_t brokerPort,
- uint16_t intervalSeconds,
- bool useExternalThread)
+ManagementAgentImpl::~ManagementAgentImpl()
{
+ // shutdown & cleanup all threads
+ connThreadBody.close();
+ pubThreadBody.close();
+
+ connThread.join();
+ pubThread.join();
+
+ // Release the memory associated with stored management objects.
{
Mutex::ScopedLock lock(agentLock);
- startupWait = true;
+
+ moveNewObjectsLH();
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++) {
+ ManagementObject* object = iter->second;
+ delete object;
+ }
+ managementObjects.clear();
+ }
+ if (pipeHandle) {
+ delete pipeHandle;
+ pipeHandle = 0;
}
+}
+void ManagementAgentImpl::init(const string& brokerHost,
+ uint16_t brokerPort,
+ uint16_t intervalSeconds,
+ bool useExternalThread,
+ const string& _storeFile,
+ const string& uid,
+ const string& pwd,
+ const string& mech,
+ const string& proto)
+{
+ client::ConnectionSettings settings;
+ settings.protocol = proto;
+ settings.host = brokerHost;
+ settings.port = brokerPort;
+ settings.username = uid;
+ settings.password = pwd;
+ settings.mechanism = mech;
+ init(settings, intervalSeconds, useExternalThread, _storeFile);
+}
+
+void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings,
+ uint16_t intervalSeconds,
+ bool useExternalThread,
+ const std::string& _storeFile)
+{
interval = intervalSeconds;
extThread = useExternalThread;
+ storeFile = _storeFile;
nextObjectId = 1;
- sessionId.generate();
- queueName << "qmfagent-" << sessionId;
- string dest = "qmfagent";
+ QPID_LOG(info, "QMF Agent Initialized: broker=" << settings.host << ":" << settings.port <<
+ " interval=" << intervalSeconds << " storeFile=" << _storeFile);
+ connectionSettings = settings;
- connection.open(brokerHost.c_str(), brokerPort);
- session = connection.newSession (queueName.str());
- dispatcher = new client::Dispatcher(session);
+ // TODO: Abstract the socket calls for portability
+ // qpid::sys::PipeHandle to create a pipe
+ if (extThread) {
+ pipeHandle = new PipeHandle(true);
+ }
+ retrieveData();
+ bootSequence++;
+ if ((bootSequence & 0xF000) != 0)
+ bootSequence = 1;
+ storeData(true);
- session.queueDeclare (arg::queue=queueName.str());
- session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(),
- arg::bindingKey=queueName.str());
- session.messageSubscribe (arg::queue=queueName.str(),
- arg::destination=dest);
- session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);
- session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);
+ initialized = true;
+}
- Message attachRequest;
- char rawbuffer[512];
- Buffer buffer (rawbuffer, 512);
+void ManagementAgentImpl::registerClass(const string& packageName,
+ const string& className,
+ uint8_t* md5Sum,
+ qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+{
+ Mutex::ScopedLock lock(agentLock);
+ PackageMap::iterator pIter = findOrAddPackage(packageName);
+ addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
+}
- attachRequest.getDeliveryProperties().setRoutingKey("broker");
- attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+void ManagementAgentImpl::registerEvent(const string& packageName,
+ const string& eventName,
+ uint8_t* md5Sum,
+ qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+{
+ Mutex::ScopedLock lock(agentLock);
+ PackageMap::iterator pIter = findOrAddPackage(packageName);
+ addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
+}
- EncodeHeader (buffer, 'A');
- buffer.putShortString ("RemoteAgent [C++]");
- systemId.encode (buffer);
- buffer.putLong (11);
+ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
+ uint64_t persistId)
+{
+ Mutex::ScopedLock lock(addLock);
+ uint16_t sequence = persistId ? 0 : bootSequence;
+ uint64_t objectNum = persistId ? persistId : nextObjectId++;
- size_t length = 512 - buffer.available ();
- string stringBuffer (rawbuffer, length);
- attachRequest.setData (stringBuffer);
+ ObjectId objectId(&attachment, 0, sequence, objectNum);
- session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management");
+ // TODO: fix object-id handling
+ object->setObjectId(objectId);
+ newManagementObjects[objectId] = object;
+ return objectId;
+}
- dispatcher->listen(dest, this);
- dispatcher->start();
+void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity)
+{
+ Mutex::ScopedLock lock(agentLock);
+ Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+ uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
+ stringstream key;
- {
- Mutex::ScopedLock lock(agentLock);
- if (startupWait)
- startupCond.wait(agentLock);
- }
+ key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
+ event.getPackageName() << "." << event.getEventName();
+
+ encodeHeader(outBuffer, 'e');
+ outBuffer.putShortString(event.getPackageName());
+ outBuffer.putShortString(event.getEventName());
+ outBuffer.putBin128(event.getMd5Sum());
+ outBuffer.putLongLong(uint64_t(Duration(now())));
+ outBuffer.putOctet(sev);
+ event.encode(outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", key.str());
}
-ManagementAgentImpl::~ManagementAgentImpl()
+uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
{
- dispatcher->stop();
- session.close();
- delete dispatcher;
+ Mutex::ScopedLock lock(agentLock);
+
+ for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) {
+ if (methodQueue.empty())
+ break;
+
+ QueuedMethod* item = methodQueue.front();
+ methodQueue.pop_front();
+ {
+ Mutex::ScopedUnlock unlock(agentLock);
+ Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size());
+ invokeMethodRequest(inBuffer, item->sequence, item->replyTo);
+ delete item;
+ }
+ }
+
+ char rbuf[100];
+ while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes
+ return methodQueue.size();
}
-void ManagementAgentImpl::RegisterClass (std::string packageName,
- std::string className,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall)
-{
- Mutex::ScopedLock lock(agentLock);
- PackageMap::iterator pIter = FindOrAddPackage (packageName);
- AddClassLocal (pIter, className, md5Sum, schemaCall);
+int ManagementAgentImpl::getSignalFd(void)
+{
+ return pipeHandle->getReadHandle();
}
-uint64_t ManagementAgentImpl::addObject (ManagementObject* object,
- uint32_t /*persistId*/,
- uint32_t /*persistBank*/)
+void ManagementAgentImpl::startProtocol()
{
- Mutex::ScopedLock lock(addLock);
- uint64_t objectId;
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ connected = true;
+ encodeHeader(buffer, 'A');
+ buffer.putShortString("RemoteAgent [C++]");
+ systemId.encode (buffer);
+ buffer.putLong(requestedBrokerBank);
+ buffer.putLong(requestedAgentBank);
+ uint32_t length = buffer.getPosition();
+ buffer.reset();
+ connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker");
+ QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
+ " reqAgent=" << requestedAgentBank);
+}
- // TODO: fix object-id handling
- objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF);
- object->setObjectId (objectId);
- newManagementObjects[objectId] = object;
- return objectId;
+void ManagementAgentImpl::storeData(bool requested)
+{
+ if (!storeFile.empty()) {
+ ofstream outFile(storeFile.c_str());
+ uint32_t brokerBankToWrite = requested ? requestedBrokerBank : assignedBrokerBank;
+ uint32_t agentBankToWrite = requested ? requestedAgentBank : assignedAgentBank;
+
+ if (outFile.good()) {
+ outFile << storeMagicNumber << " " << brokerBankToWrite << " " <<
+ agentBankToWrite << " " << bootSequence << endl;
+ outFile.close();
+ }
+ }
}
-uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/)
+void ManagementAgentImpl::retrieveData()
{
- return 0;
+ if (!storeFile.empty()) {
+ ifstream inFile(storeFile.c_str());
+ string mn;
+
+ if (inFile.good()) {
+ inFile >> mn;
+ if (mn == storeMagicNumber) {
+ inFile >> requestedBrokerBank;
+ inFile >> requestedAgentBank;
+ inFile >> bootSequence;
+ }
+ inFile.close();
+ }
+ }
}
-int ManagementAgentImpl::getSignalFd(void)
+void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence,
+ uint32_t code, string text)
{
- return -1;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ encodeHeader(outBuffer, 'z', sequence);
+ outBuffer.putLong(code);
+ outBuffer.putShortString(text);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey);
+ QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
}
void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
{
Mutex::ScopedLock lock(agentLock);
- uint32_t assigned;
- stringstream key;
- assigned = inBuffer.getLong();
- objIdPrefix = ((uint64_t) assigned) << 24;
+ assignedBrokerBank = inBuffer.getLong();
+ assignedAgentBank = inBuffer.getLong();
- startupWait = false;
- startupCond.notify();
+ QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
+
+ if ((assignedBrokerBank != requestedBrokerBank) ||
+ (assignedAgentBank != requestedAgentBank)) {
+ if (requestedAgentBank == 0) {
+ QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
+ assignedAgentBank);
+ } else {
+ QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
+ "." << assignedAgentBank);
+ }
+ storeData();
+ requestedBrokerBank = assignedBrokerBank;
+ requestedAgentBank = assignedAgentBank;
+ }
+
+ attachment.setBanks(assignedBrokerBank, assignedAgentBank);
// Bind to qpid.management to receive commands
- key << "agent." << assigned;
- session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(),
- arg::bindingKey=key.str());
+ connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank);
// Send package indications for all local packages
for (PackageMap::iterator pIter = packages.begin();
@@ -196,21 +351,21 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'p');
- EncodePackageIndication(outBuffer, pIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ encodeHeader(outBuffer, 'p');
+ encodePackageIndication(outBuffer, pIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
// Send class indications for all local classes
ClassMap cMap = pIter->second;
for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) {
outBuffer.reset();
- EncodeHeader(outBuffer, 'q');
- EncodeClassIndication(outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ encodeHeader(outBuffer, 'q');
+ encodeClassIndication(outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
}
}
}
@@ -225,20 +380,24 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc
inBuffer.getShortString(key.name);
inBuffer.getBin128(key.hash);
+ QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
+
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
- ClassMap cMap = pIter->second;
+ ClassMap& cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end()) {
- SchemaClass schema = cIter->second;
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader(outBuffer, 's', sequence);
- schema.writeSchemaCall(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ SchemaClass& schema = cIter->second;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ encodeHeader(outBuffer, 's', sequence);
+ schema.writeSchemaCall(outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+
+ QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
}
}
}
@@ -247,30 +406,134 @@ void ManagementAgentImpl::handleConsoleAddedIndication()
{
Mutex::ScopedLock lock(agentLock);
clientWasAdded = true;
+
+ QPID_LOG(trace, "RCVD ConsoleAddedInd");
}
-void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
{
string methodName;
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ string packageName;
+ string className;
+ uint8_t hash[16];
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- uint64_t objId = inBuffer.getLongLong();
+ ObjectId objId(inBuffer);
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
- EncodeHeader(outBuffer, 'm', sequence);
+ encodeHeader(outBuffer, 'm', sequence);
ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter == managementObjects.end() || iter->second->isDeleted()) {
outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
- outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
+ outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT));
} else {
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ if ((iter->second->getPackageName() != packageName) ||
+ (iter->second->getClassName() != className)) {
+ outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID);
+ outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
+ }
+ else
+ try {
+ outBuffer.record();
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
+ } catch(exception& e) {
+ outBuffer.restore();
+ outBuffer.putLong(Manageable::STATUS_EXCEPTION);
+ outBuffer.putMediumString(e.what());
+ }
}
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+ connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+}
+
+void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo)
+{
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+
+ moveNewObjectsLH();
+
+ ft.decode(inBuffer);
+
+ QPID_LOG(trace, "RCVD GetQuery: map=" << ft);
+
+ value = ft.get("_class");
+ if (value.get() == 0 || !value->convertsTo<string>()) {
+ value = ft.get("_objectid");
+ if (value.get() == 0 || !value->convertsTo<string>())
+ return;
+
+ ObjectId selector(value->get<string>());
+ ManagementObjectMap::iterator iter = managementObjects.find(selector);
+ if (iter != managementObjects.end()) {
+ ManagementObject* object = iter->second;
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ encodeHeader(outBuffer, 'g', sequence);
+ object->writeProperties(outBuffer);
+ object->writeStatistics(outBuffer, true);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+
+ QPID_LOG(trace, "SENT ObjectInd");
+ }
+ sendCommandComplete(replyTo, sequence);
+ return;
+ }
+
+ string className(value->get<string>());
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ if (object->getClassName() == className) {
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ encodeHeader(outBuffer, 'g', sequence);
+ object->writeProperties(outBuffer);
+ object->writeStatistics(outBuffer, true);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+
+ QPID_LOG(trace, "SENT ObjectInd");
+ }
+ }
+
+ sendCommandComplete(replyTo, sequence);
+}
+
+void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+{
+ if (extThread) {
+ Mutex::ScopedLock lock(agentLock);
+ string body;
+
+ inBuffer.getRawData(body, inBuffer.available());
+ methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
+ pipeHandle->write("X", 1);
+ } else {
+ invokeMethodRequest(inBuffer, sequence, replyTo);
+ }
+
+ QPID_LOG(trace, "RCVD MethodRequest");
}
void ManagementAgentImpl::received(Message& msg)
@@ -287,215 +550,371 @@ void ManagementAgentImpl::received(Message& msg)
replyToKey = rt.getRoutingKey();
}
- if (CheckHeader (inBuffer, &opcode, &sequence))
+ if (checkHeader(inBuffer, &opcode, &sequence))
{
if (opcode == 'a') handleAttachResponse(inBuffer);
else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
else if (opcode == 'x') handleConsoleAddedIndication();
+ else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey);
else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey);
}
}
-void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
{
- buf.putOctet ('A');
- buf.putOctet ('M');
- buf.putOctet ('1');
- buf.putOctet (opcode);
- buf.putLong (seq);
+ buf.putOctet('A');
+ buf.putOctet('M');
+ buf.putOctet('2');
+ buf.putOctet(opcode);
+ buf.putLong (seq);
}
-bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
if (buf.getSize() < 8)
return false;
- uint8_t h1 = buf.getOctet ();
- uint8_t h2 = buf.getOctet ();
- uint8_t h3 = buf.getOctet ();
+ uint8_t h1 = buf.getOctet();
+ uint8_t h2 = buf.getOctet();
+ uint8_t h3 = buf.getOctet();
- *opcode = buf.getOctet ();
- *seq = buf.getLong ();
+ *opcode = buf.getOctet();
+ *seq = buf.getLong();
- return h1 == 'A' && h2 == 'M' && h3 == '1';
+ return h1 == 'A' && h2 == 'M' && h3 == '2';
}
-void ManagementAgentImpl::SendBuffer (Buffer& buf,
- uint32_t length,
- string exchange,
- string routingKey)
+ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage(const string& name)
{
- Message msg;
- string data;
-
- if (objIdPrefix == 0)
- return;
-
- buf.getRawData(data, length);
- msg.getDeliveryProperties().setRoutingKey(routingKey);
- msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
- msg.setData (data);
- session.messageTransfer (arg::content=msg, arg::destination=exchange);
-}
-
-ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name)
-{
- PackageMap::iterator pIter = packages.find (name);
- if (pIter != packages.end ())
+ PackageMap::iterator pIter = packages.find(name);
+ if (pIter != packages.end())
return pIter;
// No such package found, create a new map entry.
- std::pair<PackageMap::iterator, bool> result =
- packages.insert (std::pair<string, ClassMap> (name, ClassMap ()));
+ pair<PackageMap::iterator, bool> result =
+ packages.insert(pair<string, ClassMap>(name, ClassMap()));
- // Publish a package-indication message
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ if (connected) {
+ // Publish a package-indication message
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
- EncodeHeader (outBuffer, 'p');
- EncodePackageIndication (outBuffer, result.first);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package");
+ encodeHeader(outBuffer, 'p');
+ encodePackageIndication(outBuffer, result.first);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "schema.package");
+ }
return result.first;
}
void ManagementAgentImpl::moveNewObjectsLH()
{
- Mutex::ScopedLock lock (addLock);
- for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
- iter != newManagementObjects.end ();
+ Mutex::ScopedLock lock(addLock);
+ for (ManagementObjectMap::iterator iter = newManagementObjects.begin();
+ iter != newManagementObjects.end();
iter++)
managementObjects[iter->first] = iter->second;
newManagementObjects.clear();
}
-void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter,
- string className,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementAgentImpl::addClassLocal(uint8_t classKind,
+ PackageMap::iterator pIter,
+ const string& className,
+ uint8_t* md5Sum,
+ qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
key.name = className;
- memcpy (&key.hash, md5Sum, 16);
+ memcpy(&key.hash, md5Sum, 16);
- ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end())
return;
// No such class found, create a new class with local information.
- SchemaClass classInfo;
-
- classInfo.writeSchemaCall = schemaCall;
- cMap[key] = classInfo;
-
- // TODO: Publish a class-indication message
+ cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind)));
}
-void ManagementAgentImpl::EncodePackageIndication (Buffer& buf,
- PackageMap::iterator pIter)
+void ManagementAgentImpl::encodePackageIndication(Buffer& buf,
+ PackageMap::iterator pIter)
{
- buf.putShortString ((*pIter).first);
+ buf.putShortString((*pIter).first);
+
+ QPID_LOG(trace, "SENT PackageInd: package=" << (*pIter).first);
}
-void ManagementAgentImpl::EncodeClassIndication (Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter)
+void ManagementAgentImpl::encodeClassIndication(Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter)
{
SchemaClassKey key = (*cIter).first;
- buf.putShortString ((*pIter).first);
- buf.putShortString (key.name);
- buf.putBin128 (key.hash);
+ buf.putOctet((*cIter).second.kind);
+ buf.putShortString((*pIter).first);
+ buf.putShortString(key.name);
+ buf.putBin128(key.hash);
+
+ QPID_LOG(trace, "SENT ClassInd: package=" << (*pIter).first << " class=" << key.name);
}
-void ManagementAgentImpl::PeriodicProcessing()
+void ManagementAgentImpl::periodicProcessing()
{
#define BUFSIZE 65536
Mutex::ScopedLock lock(agentLock);
char msgChars[BUFSIZE];
uint32_t contentSize;
- string routingKey;
- std::list<uint64_t> deleteList;
+ list<pair<ObjectId, ManagementObject*> > deleteList;
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- EncodeHeader(msgBuffer, 'h');
- msgBuffer.putLongLong(uint64_t(Duration(now())));
+ if (!connected)
+ return;
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + systemId.str() + ".heartbeat";
- SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ moveNewObjectsLH();
+
+ //
+ // Clear the been-here flag on all objects in the map.
+ //
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ object->setFlags(0);
+ if (clientWasAdded) {
+ object->setForcePublish(true);
+ }
}
- moveNewObjectsLH();
+ clientWasAdded = false;
+
+ //
+ // Process the entire object map.
+ //
+ for (ManagementObjectMap::iterator baseIter = managementObjects.begin();
+ baseIter != managementObjects.end();
+ baseIter++) {
+ ManagementObject* baseObject = baseIter->second;
+
+ //
+ // Skip until we find a base object requiring a sent message.
+ //
+ if (baseObject->getFlags() == 1 ||
+ (!baseObject->getConfigChanged() &&
+ !baseObject->getInstChanged() &&
+ !baseObject->getForcePublish() &&
+ !baseObject->isDeleted()))
+ continue;
- if (clientWasAdded)
- {
- clientWasAdded = false;
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
- {
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ for (ManagementObjectMap::iterator iter = baseIter;
+ iter != managementObjects.end();
+ iter++) {
ManagementObject* object = iter->second;
- object->setAllChanged ();
+ if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
+ object->setFlags(1);
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
+ encodeHeader(msgBuffer, 'c');
+ object->writeProperties(msgBuffer);
+ }
+
+ if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
+ encodeHeader(msgBuffer, 'i');
+ object->writeStatistics(msgBuffer);
+ }
+
+ if (object->isDeleted())
+ deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
+ object->setForcePublish(false);
+
+ if (msgBuffer.available() < (BUFSIZE / 2))
+ break;
+ }
+ }
+
+ contentSize = BUFSIZE - msgBuffer.available();
+ if (contentSize > 0) {
+ msgBuffer.reset();
+ stringstream key;
+ key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." <<
+ baseObject->getPackageName() << "." << baseObject->getClassName();
+ connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
}
}
- if (managementObjects.empty ())
- return;
-
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
+ // Delete flagged objects
+ for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin();
+ iter != deleteList.rend();
+ iter++) {
+ delete iter->second;
+ managementObjects.erase(iter->first);
+ }
+
+ deleteList.clear();
+
{
- ManagementObject* object = iter->second;
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ encodeHeader(msgBuffer, 'h');
+ msgBuffer.putLongLong(uint64_t(Duration(now())));
+ stringstream key;
+ key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
- if (object->getConfigChanged () || object->isDeleted ())
- {
- Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'c');
- object->writeProperties(msgBuffer);
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ contentSize = BUFSIZE - msgBuffer.available();
+ msgBuffer.reset();
+ connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
+ }
+}
+
+void ManagementAgentImpl::ConnectionThread::run()
+{
+ static const int delayMin(1);
+ static const int delayMax(128);
+ static const int delayFactor(2);
+ int delay(delayMin);
+ string dest("qmfagent");
+ ConnectionThread::shared_ptr tmp;
+
+ sessionId.generate();
+ queueName << "qmfagent-" << sessionId;
+
+ while (true) {
+ try {
+ if (agent.initialized) {
+ QPID_LOG(debug, "QMF Agent attempting to connect to the broker...");
+ connection.open(agent.connectionSettings);
+ session = connection.newSession(queueName.str());
+ subscriptions.reset(new client::SubscriptionManager(session));
+
+ session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true,
+ arg::exclusive=true);
+ session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
+ arg::bindingKey=queueName.str());
+
+ subscriptions->subscribe(agent, queueName.str(), dest);
+ QPID_LOG(info, "Connection established with broker");
+ {
+ Mutex::ScopedLock _lock(connLock);
+ if (shutdown)
+ return;
+ operational = true;
+ agent.startProtocol();
+ try {
+ Mutex::ScopedUnlock _unlock(connLock);
+ subscriptions->run();
+ } catch (exception) {}
+
+ QPID_LOG(warning, "Connection to the broker has been lost");
+
+ operational = false;
+ agent.connected = false;
+ tmp = subscriptions;
+ subscriptions.reset();
+ }
+ tmp.reset(); // frees the subscription outside the lock
+ delay = delayMin;
+ connection.close();
+ }
+ } catch (exception &e) {
+ if (delay < delayMax)
+ delay *= delayFactor;
+ QPID_LOG(debug, "Connection failed: exception=" << e.what());
}
-
- if (object->getInstChanged ())
+
{
- Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'i');
- object->writeStatistics(msgBuffer);
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ // sleep for "delay" seconds, but peridically check if the
+ // agent is shutting down so we don't hang for up to delayMax
+ // seconds during agent shutdown
+ Mutex::ScopedLock _lock(connLock);
+ if (shutdown)
+ return;
+ sleeping = true;
+ int totalSleep = 0;
+ do {
+ Mutex::ScopedUnlock _unlock(connLock);
+ ::sleep(delayMin);
+ totalSleep += delayMin;
+ } while (totalSleep < delay && !shutdown);
+ sleeping = false;
+ if (shutdown)
+ return;
}
+ }
+}
+
+ManagementAgentImpl::ConnectionThread::~ConnectionThread()
+{
+}
- if (object->isDeleted ())
- deleteList.push_back (iter->first);
+void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
+ uint32_t length,
+ const string& exchange,
+ const string& routingKey)
+{
+ ConnectionThread::shared_ptr s;
+ {
+ Mutex::ScopedLock _lock(connLock);
+ if (!operational)
+ return;
+ s = subscriptions;
}
- // Delete flagged objects
- for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
- iter != deleteList.rend ();
- iter++)
- managementObjects.erase (*iter);
+ Message msg;
+ string data;
- deleteList.clear ();
+ buf.getRawData(data, length);
+ msg.getDeliveryProperties().setRoutingKey(routingKey);
+ msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+ msg.setData(data);
+ try {
+ session.messageTransfer(arg::content=msg, arg::destination=exchange);
+ } catch(exception& e) {
+ QPID_LOG(error, "Exception caught in sendBuffer: " << e.what());
+ // Bounce the connection
+ if (s)
+ s->stop();
+ }
}
-void ManagementAgentImpl::BackgroundThread::run()
+void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank)
{
- while (true) {
- ::sleep(5);
- agent.PeriodicProcessing();
+ stringstream key;
+ key << "agent." << brokerBank << "." << agentBank;
+ session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(),
+ arg::bindingKey=key.str());
+}
+
+void ManagementAgentImpl::ConnectionThread::close()
+{
+ ConnectionThread::shared_ptr s;
+ {
+ Mutex::ScopedLock _lock(connLock);
+ shutdown = true;
+ s = subscriptions;
+ }
+ if (s)
+ s->stop();
+}
+
+bool ManagementAgentImpl::ConnectionThread::isSleeping() const
+{
+ Mutex::ScopedLock _lock(connLock);
+ return sleeping;
+}
+
+
+void ManagementAgentImpl::PublishThread::run()
+{
+ uint16_t totalSleep;
+
+ while (!shutdown) {
+ agent.periodicProcessing();
+ totalSleep = 0;
+ while (totalSleep++ < agent.getInterval() && !shutdown) {
+ ::sleep(1);
+ }
}
}