summaryrefslogtreecommitdiff
path: root/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp')
-rw-r--r--trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp933
1 files changed, 0 insertions, 933 deletions
diff --git a/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
deleted file mode 100644
index 1bdd8ab836..0000000000
--- a/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
+++ /dev/null
@@ -1,933 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ManagementBroker.h"
-#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/log/Statement.h"
-#include <qpid/broker/Message.h>
-#include <qpid/broker/MessageDelivery.h>
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/sys/Time.h"
-#include "qpid/broker/ConnectionState.h"
-#include <list>
-#include <iostream>
-#include <fstream>
-
-using boost::intrusive_ptr;
-using qpid::framing::Uuid;
-using namespace qpid::framing;
-using namespace qpid::management;
-using namespace qpid::broker;
-using namespace qpid::sys;
-using namespace std;
-
-Mutex ManagementAgent::Singleton::lock;
-bool ManagementAgent::Singleton::disabled = false;
-ManagementAgent* ManagementAgent::Singleton::agent = 0;
-int ManagementAgent::Singleton::refCount = 0;
-
-ManagementAgent::Singleton::Singleton(bool disableManagement)
-{
- Mutex::ScopedLock _lock(lock);
- if (disableManagement && !disabled) {
- disabled = true;
- assert(refCount == 0); // can't disable after agent has been allocated
- }
- if (refCount == 0 && !disabled)
- agent = new ManagementBroker();
- refCount++;
-}
-
-ManagementAgent::Singleton::~Singleton()
-{
- Mutex::ScopedLock _lock(lock);
- refCount--;
- if (refCount == 0 && !disabled) {
- delete agent;
- agent = 0;
- }
-}
-
-ManagementAgent* ManagementAgent::Singleton::getInstance()
-{
- return agent;
-}
-
-ManagementBroker::RemoteAgent::~RemoteAgent ()
-{
- if (mgmtObject != 0)
- mgmtObject->resourceDestroy();
-}
-
-ManagementBroker::ManagementBroker () :
- threadPoolSize(1), interval(10), broker(0)
-{
- localBank = 5;
- nextObjectId = 1;
- bootSequence = 1;
- nextRemoteBank = 10;
- nextRequestSequence = 1;
- clientWasAdded = false;
-}
-
-ManagementBroker::~ManagementBroker ()
-{
- timer.stop();
- {
- Mutex::ScopedLock lock (userLock);
-
- // Reset the shared pointers to exchanges. If this is not done now, the exchanges
- // will stick around until dExchange and mExchange are implicitely destroyed (long
- // after this destructor completes). Those exchanges hold references to management
- // objects that will be invalid.
- dExchange.reset();
- mExchange.reset();
-
- moveNewObjectsLH();
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++) {
- ManagementObject* object = iter->second;
- delete object;
- }
- managementObjects.clear();
- }
-}
-
-void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable* _broker, int _threads)
-{
- dataDir = _dataDir;
- interval = _interval;
- broker = _broker;
- threadPoolSize = _threads;
- timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
-
- // Get from file or generate and save to file.
- if (dataDir.empty())
- {
- uuid.generate();
- QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: "
- << uuid);
- }
- else
- {
- string filename(dataDir + "/.mbrokerdata");
- ifstream inFile(filename.c_str ());
-
- if (inFile.good())
- {
- inFile >> uuid;
- inFile >> bootSequence;
- inFile >> nextRemoteBank;
- inFile.close();
- QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid);
-
- bootSequence++;
- writeData();
- }
- else
- {
- uuid.generate();
- QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid);
- writeData();
- }
-
- QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence);
- }
-}
-
-void ManagementBroker::writeData ()
-{
- string filename (dataDir + "/.mbrokerdata");
- ofstream outFile (filename.c_str ());
-
- if (outFile.good())
- {
- outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl;
- outFile.close();
- }
-}
-
-void ManagementBroker::setExchange (broker::Exchange::shared_ptr _mexchange,
- broker::Exchange::shared_ptr _dexchange)
-{
- mExchange = _mexchange;
- dExchange = _dexchange;
-}
-
-void ManagementBroker::RegisterClass (string packageName,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
-{
- Mutex::ScopedLock lock(userLock);
- PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
- AddClass(pIter, className, md5Sum, schemaCall);
-}
-
-uint64_t ManagementBroker::addObject (ManagementObject* object,
- uint32_t persistId,
- uint32_t persistBank)
-{
- Mutex::ScopedLock lock (addLock);
- uint64_t objectId;
-
- if (persistId == 0)
- {
- objectId = ((uint64_t) bootSequence) << 48 |
- ((uint64_t) localBank) << 24 | nextObjectId++;
- if ((nextObjectId & 0xFF000000) != 0)
- {
- nextObjectId = 1;
- localBank++;
- }
- }
- else
- objectId = ((uint64_t) persistBank) << 24 | persistId;
-
- object->setObjectId (objectId);
- newManagementObjects[objectId] = object;
- return objectId;
-}
-
-ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
- : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {}
-
-ManagementBroker::Periodic::~Periodic () {}
-
-void ManagementBroker::Periodic::fire ()
-{
- broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval)));
- broker.PeriodicProcessing ();
-}
-
-void ManagementBroker::clientAdded (void)
-{
- Mutex::ScopedLock lock (userLock);
-
- clientWasAdded = true;
- for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
- aIter != remoteAgents.end();
- aIter++) {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'x');
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
- }
-}
-
-void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
-{
- buf.putOctet ('A');
- buf.putOctet ('M');
- buf.putOctet ('1');
- buf.putOctet (opcode);
- buf.putLong (seq);
-}
-
-bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
-{
- uint8_t h1 = buf.getOctet ();
- uint8_t h2 = buf.getOctet ();
- uint8_t h3 = buf.getOctet ();
-
- *opcode = buf.getOctet ();
- *seq = buf.getLong ();
-
- return h1 == 'A' && h2 == 'M' && h3 == '1';
-}
-
-void ManagementBroker::SendBuffer (Buffer& buf,
- uint32_t length,
- broker::Exchange::shared_ptr exchange,
- string routingKey)
-{
- if (exchange.get() == 0)
- return;
-
- intrusive_ptr<Message> msg (new Message ());
- AMQFrame method (in_place<MessageTransferBody>(
- ProtocolVersion(), exchange->getName (), 0, 0));
- AMQFrame header (in_place<AMQHeaderBody>());
- AMQFrame content(in_place<AMQContentBody>());
-
- content.castBody<AMQContentBody>()->decode(buf, length);
-
- method.setEof (false);
- header.setBof (false);
- header.setEof (false);
- content.setBof (false);
-
- msg->getFrames().append(method);
- msg->getFrames().append(header);
-
- MessageProperties* props =
- msg->getFrames().getHeaders()->get<MessageProperties>(true);
- props->setContentLength(length);
- msg->getFrames().append(content);
-
- DeliverableMessage deliverable (msg);
- exchange->route (deliverable, routingKey, 0);
-}
-
-void ManagementBroker::moveNewObjectsLH()
-{
- Mutex::ScopedLock lock (addLock);
- for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
- iter != newManagementObjects.end ();
- iter++)
- managementObjects[iter->first] = iter->second;
- newManagementObjects.clear();
-}
-
-void ManagementBroker::PeriodicProcessing (void)
-{
-#define BUFSIZE 65536
- Mutex::ScopedLock lock (userLock);
- char msgChars[BUFSIZE];
- uint32_t contentSize;
- string routingKey;
- std::list<uint64_t> deleteList;
-
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- EncodeHeader(msgBuffer, 'h');
- msgBuffer.putLongLong(uint64_t(Duration(now())));
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + uuid.str() + ".heartbeat";
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
- }
-
- moveNewObjectsLH();
-
- if (clientWasAdded)
- {
- clientWasAdded = false;
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
- {
- ManagementObject* object = iter->second;
- object->setAllChanged ();
- }
- }
-
- if (managementObjects.empty ())
- return;
-
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
- {
- ManagementObject* object = iter->second;
-
- if (object->getConfigChanged () || object->isDeleted ())
- {
- Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'c');
- object->writeProperties(msgBuffer);
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + uuid.str() + ".prop." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
- }
-
- if (object->getInstChanged ())
- {
- Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'i');
- object->writeStatistics(msgBuffer);
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + uuid.str () + ".stat." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
- }
-
- if (object->isDeleted ())
- deleteList.push_back (iter->first);
- }
-
- // Delete flagged objects
- for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
- iter != deleteList.rend ();
- iter++)
- managementObjects.erase (*iter);
-
- if (!deleteList.empty()) {
- deleteList.clear();
- deleteOrphanedAgentsLH();
- }
-}
-
-void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence,
- uint32_t code, string text)
-{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'z', sequence);
- outBuffer.putLong (code);
- outBuffer.putShortString (text);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
-}
-
-bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
- const string& routingKey,
- const FieldTable* /*args*/)
-{
- Mutex::ScopedLock lock (userLock);
- Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
-
- // Parse the routing key. This management broker should act as though it
- // is bound to the exchange to match the following keys:
- //
- // agent.<X>.#
- // broker.#
- //
- // where <X> is any non-negative decimal integer less than the lowest remote
- // object-id bank.
-
- if (routingKey == "broker") {
- dispatchAgentCommandLH (msg);
- return false;
- }
-
- else if (routingKey.compare(0, 6, "agent.") == 0) {
- std::string::size_type delim = routingKey.find('.', 6);
- if (delim == string::npos)
- delim = routingKey.length();
- string bank = routingKey.substr(6, delim - 6);
- if ((uint32_t) atoi(bank.c_str()) <= localBank) {
- dispatchAgentCommandLH (msg);
- return false;
- }
- }
-
- return true;
-}
-
-void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
-{
- string methodName;
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- uint64_t objId = inBuffer.getLongLong();
- inBuffer.getShortString(methodName);
-
- EncodeHeader(outBuffer, 'm', sequence);
-
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
- if (iter == managementObjects.end() || iter->second->isDeleted()) {
- outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
- outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
- } else {
- iter->second->doMethod(methodName, inBuffer, outBuffer);
- }
-
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
-}
-
-void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
-{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'b', sequence);
- uuid.encode (outBuffer);
-
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
-}
-
-void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
-{
- for (PackageMap::iterator pIter = packages.begin ();
- pIter != packages.end ();
- pIter++)
- {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'p', sequence);
- EncodePackageIndication (outBuffer, pIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- }
-
- sendCommandComplete (replyToKey, sequence);
-}
-
-void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
-{
- std::string packageName;
-
- inBuffer.getShortString(packageName);
- FindOrAddPackageLH(packageName);
-}
-
-void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
-{
- std::string packageName;
-
- inBuffer.getShortString (packageName);
- PackageMap::iterator pIter = packages.find (packageName);
- if (pIter != packages.end ())
- {
- ClassMap cMap = pIter->second;
- for (ClassMap::iterator cIter = cMap.begin ();
- cIter != cMap.end ();
- cIter++)
- {
- if (cIter->second->hasSchema ())
- {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'q', sequence);
- EncodeClassIndication (outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- }
- }
- }
-
- sendCommandComplete (replyToKey, sequence);
-}
-
-void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
-{
- std::string packageName;
- SchemaClassKey key;
-
- inBuffer.getShortString(packageName);
- inBuffer.getShortString(key.name);
- inBuffer.getBin128(key.hash);
-
- PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
- ClassMap::iterator cIter = pIter->second.find(key);
- if (cIter == pIter->second.end()) {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
- uint32_t sequence = nextRequestSequence++;
-
- EncodeHeader (outBuffer, 'S', sequence);
- outBuffer.putShortString(packageName);
- outBuffer.putShortString(key.name);
- outBuffer.putBin128(key.hash);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
-
- SchemaClass* newSchema = new SchemaClass;
- newSchema->pendingSequence = sequence;
- pIter->second[key] = newSchema;
- }
-}
-
-void ManagementBroker::SchemaClass::appendSchema(Buffer& buf)
-{
- // If the management package is attached locally (embedded in the broker or
- // linked in via plug-in), call the schema handler directly. If the package
- // is from a remote management agent, send the stored schema information.
-
- if (writeSchemaCall != 0)
- writeSchemaCall(buf);
- else
- buf.putRawData(buffer, bufferLen);
-}
-
-void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
-{
- string packageName;
- SchemaClassKey key;
-
- inBuffer.getShortString (packageName);
- inBuffer.getShortString (key.name);
- inBuffer.getBin128 (key.hash);
-
- PackageMap::iterator pIter = packages.find (packageName);
- if (pIter != packages.end()) {
- ClassMap cMap = pIter->second;
- ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end()) {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
- SchemaClass* classInfo = cIter->second;
-
- if (classInfo->hasSchema()) {
- EncodeHeader(outBuffer, 's', sequence);
- classInfo->appendSchema (outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- }
- else
- sendCommandComplete (replyToKey, sequence, 1, "Schema not available");
- }
- else
- sendCommandComplete (replyToKey, sequence, 1, "Class key not found");
- }
- else
- sendCommandComplete (replyToKey, sequence, 1, "Package not found");
-}
-
-void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
-{
- string packageName;
- SchemaClassKey key;
-
- inBuffer.record();
- inBuffer.getShortString (packageName);
- inBuffer.getShortString (key.name);
- inBuffer.getBin128 (key.hash);
- inBuffer.restore();
-
- PackageMap::iterator pIter = packages.find(packageName);
- if (pIter != packages.end()) {
- ClassMap cMap = pIter->second;
- ClassMap::iterator cIter = cMap.find(key);
- if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) {
- size_t length = ValidateSchema(inBuffer);
- if (length == 0)
- cMap.erase(key);
- else {
- cIter->second->buffer = (uint8_t*) malloc(length);
- cIter->second->bufferLen = length;
- inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen);
-
- // Publish a class-indication message
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'q');
- EncodeClassIndication (outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
- }
- }
- }
-}
-
-bool ManagementBroker::bankInUse (uint32_t bank)
-{
- for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
- aIter != remoteAgents.end();
- aIter++)
- if (aIter->second->objIdBank == bank)
- return true;
- return false;
-}
-
-uint32_t ManagementBroker::allocateNewBank ()
-{
- while (bankInUse (nextRemoteBank))
- nextRemoteBank++;
-
- uint32_t allocated = nextRemoteBank++;
- writeData ();
- return allocated;
-}
-
-uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank)
-{
- if (requestedBank == 0 || bankInUse (requestedBank))
- return allocateNewBank ();
- return requestedBank;
-}
-
-void ManagementBroker::deleteOrphanedAgentsLH()
-{
- vector<uint64_t> deleteList;
-
- for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) {
- uint64_t connectionRef = aIter->first;
- bool found = false;
-
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++) {
- if (iter->first == connectionRef && !iter->second->isDeleted()) {
- found = true;
- break;
- }
- }
-
- if (!found) {
- deleteList.push_back(connectionRef);
- delete aIter->second;
- }
- }
-
- for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) {
-
- remoteAgents.erase(*dIter);
- }
-
- deleteList.clear();
-}
-
-void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken)
-{
- string label;
- uint32_t requestedBank;
- uint32_t assignedBank;
- uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
- Uuid systemId;
-
- moveNewObjectsLH();
- deleteOrphanedAgentsLH();
- RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
- if (aIter != remoteAgents.end()) {
- // There already exists an agent on this session. Reject the request.
- sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent");
- return;
- }
-
- inBuffer.getShortString (label);
- systemId.decode (inBuffer);
- requestedBank = inBuffer.getLong ();
- assignedBank = assignBankLH (requestedBank);
-
- RemoteAgent* agent = new RemoteAgent;
- agent->objIdBank = assignedBank;
- agent->routingKey = replyToKey;
- agent->connectionRef = connectionRef;
- agent->mgmtObject = new management::Agent (this, agent);
- agent->mgmtObject->set_connectionRef(agent->connectionRef);
- agent->mgmtObject->set_label (label);
- agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
- agent->mgmtObject->set_systemId (systemId);
- agent->mgmtObject->set_objectIdBank (assignedBank);
- addObject (agent->mgmtObject);
-
- remoteAgents[connectionRef] = agent;
-
- // Send an Attach Response
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'a', sequence);
- outBuffer.putLong (assignedBank);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
-}
-
-void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
-{
- FieldTable ft;
- FieldTable::ValuePtr value;
-
- moveNewObjectsLH();
-
- ft.decode(inBuffer);
- value = ft.get("_class");
- if (value.get() == 0 || !value->convertsTo<string>())
- {
- // TODO: Send completion with an error code
- return;
- }
-
- string className (value->get<string>());
-
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
- {
- ManagementObject* object = iter->second;
- if (object->getClassName () == className)
- {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'g', sequence);
- object->writeProperties(outBuffer);
- object->writeStatistics(outBuffer, true);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- }
- }
-
- sendCommandComplete (replyToKey, sequence);
-}
-
-void ManagementBroker::dispatchAgentCommandLH (Message& msg)
-{
- Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
- uint8_t opcode;
- uint32_t sequence;
- string replyToKey;
-
- const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
- if (p && p->hasReplyTo()) {
- const framing::ReplyTo& rt = p->getReplyTo();
- replyToKey = rt.getRoutingKey();
- }
- else
- return;
-
- if (msg.encodedSize() > MA_BUFFER_SIZE) {
- QPID_LOG(debug, "ManagementBroker::dispatchAgentCommandLH: Message too large: " <<
- msg.encodedSize());
- return;
- }
-
- msg.encodeContent(inBuffer);
- inBuffer.reset();
-
- if (!CheckHeader(inBuffer, &opcode, &sequence))
- return;
-
- if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
- else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence);
-}
-
-ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
-{
- PackageMap::iterator pIter = packages.find (name);
- if (pIter != packages.end ())
- return pIter;
-
- // No such package found, create a new map entry.
- pair<PackageMap::iterator, bool> result =
- packages.insert (pair<string, ClassMap> (name, ClassMap ()));
- QPID_LOG (debug, "ManagementBroker added package " << name);
-
- // Publish a package-indication message
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'p');
- EncodePackageIndication (outBuffer, result.first);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
-
- return result.first;
-}
-
-void ManagementBroker::AddClass(PackageMap::iterator pIter,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
-{
- SchemaClassKey key;
- ClassMap& cMap = pIter->second;
-
- key.name = className;
- memcpy (&key.hash, md5Sum, 16);
-
- ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
- return;
-
- // No such class found, create a new class with local information.
- QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
- key.name);
- SchemaClass* classInfo = new SchemaClass;
-
- classInfo->writeSchemaCall = schemaCall;
- cMap[key] = classInfo;
- cIter = cMap.find (key);
-}
-
-void ManagementBroker::EncodePackageIndication (Buffer& buf,
- PackageMap::iterator pIter)
-{
- buf.putShortString ((*pIter).first);
-}
-
-void ManagementBroker::EncodeClassIndication (Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter)
-{
- SchemaClassKey key = (*cIter).first;
-
- buf.putShortString ((*pIter).first);
- buf.putShortString (key.name);
- buf.putBin128 (key.hash);
-}
-
-size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
-{
- uint32_t start = inBuffer.getPosition();
- uint32_t end;
- string text;
- uint8_t hash[16];
-
- inBuffer.record();
- inBuffer.getShortString(text);
- inBuffer.getShortString(text);
- inBuffer.getBin128(hash);
-
- uint16_t propCount = inBuffer.getShort();
- uint16_t statCount = inBuffer.getShort();
- uint16_t methCount = inBuffer.getShort();
- uint16_t evntCount = inBuffer.getShort();
-
- for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- }
-
- for (uint16_t idx = 0; idx < methCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- int argCount = ft.getInt("argCount");
- for (int mIdx = 0; mIdx < argCount; mIdx++) {
- FieldTable aft;
- aft.decode(inBuffer);
- }
- }
-
- if (evntCount != 0)
- return 0;
-
- end = inBuffer.getPosition();
- inBuffer.restore(); // restore original position
- return end - start;
-}