diff options
author | Ted Ross <tross@apache.org> | 2008-11-13 04:15:15 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-11-13 04:15:15 +0000 |
commit | 45b4ce55e9bb52f9d753b9e61ec19f07dc9f5009 (patch) | |
tree | 2e976b7d615f521ae5f8cdfb6432faaa6233402b /cpp | |
parent | 44d591ab8b7a1edf567ce6f2fba170cdf619ade1 (diff) | |
download | qpid-python-45b4ce55e9bb52f9d753b9e61ec19f07dc9f5009.tar.gz |
Updated qmf-agent API to allow user to specify uid, password, mechanism, and protocol.
Fixed qmf-console bug related to routing keys of object messages.
Pass the binding key into the management agent to allow for selective broadcast
of object data.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@713631 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgent.h | 22 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 45 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.h | 27 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.h | 22 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementExchange.cpp | 4 |
6 files changed, 84 insertions, 59 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgent.h b/cpp/src/qpid/agent/ManagementAgent.h index 03baa10aa2..296bb17e62 100644 --- a/cpp/src/qpid/agent/ManagementAgent.h +++ b/cpp/src/qpid/agent/ManagementAgent.h @@ -81,24 +81,28 @@ class ManagementAgent // storeFile - File where this process has read and write access. This // file shall be used to store persistent state. // - virtual void init (std::string brokerHost = "localhost", - uint16_t brokerPort = 5672, - uint16_t intervalSeconds = 10, - bool useExternalThread = false, - std::string storeFile = "") = 0; + virtual void init(const std::string& brokerHost = "localhost", + uint16_t brokerPort = 5672, + uint16_t intervalSeconds = 10, + bool useExternalThread = false, + const std::string& storeFile = "", + const std::string& uid = "guest", + const std::string& pwd = "guest", + const std::string& mech = "PLAIN", + const std::string& proto = "tcp") = 0; // Register a schema with the management agent. This is normally called by the // package initializer generated by the management code generator. // virtual void - registerClass(std::string& packageName, - std::string& className, + registerClass(const std::string& packageName, + const std::string& className, uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall) = 0; virtual void - registerEvent(std::string& packageName, - std::string& eventName, + registerEvent(const std::string& packageName, + const std::string& eventName, uint8_t* md5Sum, management::ManagementEvent::writeSchemaCall_t schemaCall) = 0; diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 06acf6b0e3..3f863d41d7 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -80,7 +80,7 @@ const string ManagementAgentImpl::storeMagicNumber("MA02"); ManagementAgentImpl::ManagementAgentImpl() : extThread(false), writeFd(-1), readFd(-1), - connected(false), lastFailure("never connected"), + initialized(false), connected(false), lastFailure("never connected"), clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), debugLevel(0), connThreadBody(*this), connThread(connThreadBody), @@ -102,18 +102,26 @@ ManagementAgentImpl::~ManagementAgentImpl() } } -void ManagementAgentImpl::init(string brokerHost, - uint16_t brokerPort, - uint16_t intervalSeconds, - bool useExternalThread, - string _storeFile) +void ManagementAgentImpl::init(const string& brokerHost, + uint16_t brokerPort, + uint16_t intervalSeconds, + bool useExternalThread, + const string& _storeFile, + const string& uid, + const string& pwd, + const string& mech, + const string& proto) { interval = intervalSeconds; extThread = useExternalThread; storeFile = _storeFile; nextObjectId = 1; - host = brokerHost; - port = brokerPort; + connectionSettings.protocol = proto; + connectionSettings.host = brokerHost; + connectionSettings.port = brokerPort; + connectionSettings.username = uid; + connectionSettings.password = pwd; + connectionSettings.mechanism = mech; if (debugLevel) cout << "QMF Agent Initialized: broker=" << brokerHost << ":" << brokerPort << @@ -139,10 +147,12 @@ void ManagementAgentImpl::init(string brokerHost, if ((bootSequence & 0xF000) != 0) bootSequence = 1; storeData(true); + + initialized = true; } -void ManagementAgentImpl::registerClass(string& packageName, - string& className, +void ManagementAgentImpl::registerClass(const string& packageName, + const string& className, uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall) { @@ -151,8 +161,8 @@ void ManagementAgentImpl::registerClass(string& packageName, addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); } -void ManagementAgentImpl::registerEvent(string& packageName, - string& eventName, +void ManagementAgentImpl::registerEvent(const string& packageName, + const string& eventName, uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall) { @@ -605,7 +615,6 @@ void ManagementAgentImpl::periodicProcessing() Mutex::ScopedLock lock(agentLock); char msgChars[BUFSIZE]; uint32_t contentSize; - string routingKey; list<pair<ObjectId, ManagementObject*> > deleteList; if (!connected) @@ -692,8 +701,10 @@ void ManagementAgentImpl::periodicProcessing() contentSize = BUFSIZE - msgBuffer.available(); if (contentSize > 0) { msgBuffer.reset(); - routingKey = "console.obj." + baseObject->getPackageName() + "." + baseObject->getClassName(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); + stringstream key; + key << "console.obj." << baseObject->getPackageName() << "." << baseObject->getClassName() << "." << + assignedBrokerBank << "." << assignedAgentBank; + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); } } @@ -721,10 +732,10 @@ void ManagementAgentImpl::ConnectionThread::run() while (true) { try { - if (!agent.host.empty()) { + if (agent.initialized) { if (agent.debugLevel) cout << "QMF Agent attempting to connect to the broker..." << endl; - connection.open(agent.host.c_str(), agent.port); + connection.open(agent.connectionSettings); session = connection.newSession(queueName.str()); subscriptions = new client::SubscriptionManager(session); diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index 5b6437944f..4ba9d7262a 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -22,6 +22,7 @@ #include "ManagementAgent.h" #include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/client/Session.h" #include "qpid/client/AsyncSession.h" @@ -49,19 +50,23 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen // Methods from ManagementAgent // int getMaxThreads() { return 1; } - void init(std::string brokerHost = "localhost", - uint16_t brokerPort = 5672, - uint16_t intervalSeconds = 10, - bool useExternalThread = false, - std::string storeFile = ""); + void init(const std::string& brokerHost = "localhost", + uint16_t brokerPort = 5672, + uint16_t intervalSeconds = 10, + bool useExternalThread = false, + const std::string& storeFile = "", + const std::string& uid = "guest", + const std::string& pwd = "guest", + const std::string& mech = "PLAIN", + const std::string& proto = "tcp"); bool isConnected() { return connected; } std::string& getLastFailure() { return lastFailure; } - void registerClass(std::string& packageName, - std::string& className, + void registerClass(const std::string& packageName, + const std::string& className, uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall); - void registerEvent(std::string& packageName, - std::string& eventName, + void registerEvent(const std::string& packageName, + const std::string& eventName, uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall); ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0); @@ -130,8 +135,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen sys::Mutex agentLock; sys::Mutex addLock; framing::Uuid systemId; - std::string host; - uint16_t port; + client::ConnectionSettings connectionSettings; + bool initialized; bool connected; std::string lastFailure; diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index e2c735e660..23ef8d9e6a 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -113,7 +113,7 @@ ManagementBroker::~ManagementBroker () } } -void ManagementBroker::configure(string _dataDir, uint16_t _interval, +void ManagementBroker::configure(const string& _dataDir, uint16_t _interval, qpid::broker::Broker* _broker, int _threads) { dataDir = _dataDir; @@ -178,8 +178,8 @@ void ManagementBroker::setExchange (qpid::broker::Exchange::shared_ptr _mexchang dExchange = _dexchange; } -void ManagementBroker::registerClass (string& packageName, - string& className, +void ManagementBroker::registerClass (const string& packageName, + const string& className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { @@ -188,8 +188,8 @@ void ManagementBroker::registerClass (string& packageName, addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); } -void ManagementBroker::registerEvent (string& packageName, - string& eventName, +void ManagementBroker::registerEvent (const string& packageName, + const string& eventName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { @@ -251,7 +251,7 @@ void ManagementBroker::Periodic::fire () broker.periodicProcessing (); } -void ManagementBroker::clientAdded (void) +void ManagementBroker::clientAdded (const std::string& /*routingKey*/) { Mutex::ScopedLock lock (userLock); @@ -386,7 +386,7 @@ void ManagementBroker::periodicProcessing (void) contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); - routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName (); + routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName() + "1.0"; sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } @@ -398,7 +398,7 @@ void ManagementBroker::periodicProcessing (void) contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); - routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName (); + routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName() + "1.0"; sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } @@ -721,7 +721,7 @@ bool ManagementBroker::bankInUse (uint32_t bank) for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) - if (aIter->second->objIdBank == bank) + if (aIter->second->brokerBank == bank) return true; return false; } @@ -796,7 +796,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe assignedBank = assignBankLH(requestedAgentBank); RemoteAgent* agent = new RemoteAgent; - agent->objIdBank = assignedBank; + agent->brokerBank = brokerBank; + agent->agentBank = assignedBank; agent->routingKey = replyToKey; agent->connectionRef = connectionRef; agent->mgmtObject = new _qmf::Agent (this, agent); @@ -1006,7 +1007,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(stri void ManagementBroker::addClassLH(uint8_t kind, PackageMap::iterator pIter, - string& className, + const string& className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h index 3564d462df..77f4a53836 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -47,30 +47,33 @@ public: ManagementBroker (); virtual ~ManagementBroker (); - void configure (std::string dataDir, uint16_t interval, qpid::broker::Broker* broker, int threadPoolSize); + void configure (const std::string& dataDir, uint16_t interval, + qpid::broker::Broker* broker, int threadPoolSize); void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange, qpid::broker::Exchange::shared_ptr directExchange); int getMaxThreads () { return threadPoolSize; } - void registerClass (std::string& packageName, - std::string& className, + void registerClass (const std::string& packageName, + const std::string& className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); - void registerEvent (std::string& packageName, - std::string& eventName, + void registerEvent (const std::string& packageName, + const std::string& eventName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); ObjectId addObject (ManagementObject* object, uint64_t persistId = 0); void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT); - void clientAdded (); + void clientAdded (const std::string& routingKey); bool dispatchCommand (qpid::broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); const framing::Uuid& getUuid() const { return uuid; } // Stubs for remote management agent calls - void init (std::string, uint16_t, uint16_t, bool, std::string) { assert(0); } + void init (const std::string&, uint16_t, uint16_t, bool, + const std::string&, const std::string&, const std::string&, + const std::string&, const std::string&) { assert(0); } uint32_t pollCallbacks (uint32_t) { assert(0); return 0; } int getSignalFd () { assert(0); return -1; } @@ -91,7 +94,8 @@ private: // struct RemoteAgent : public Manageable { - uint32_t objIdBank; + uint32_t brokerBank; + uint32_t agentBank; std::string routingKey; ObjectId connectionRef; qmf::org::apache::qpid::broker::Agent* mgmtObject; @@ -195,7 +199,7 @@ private: PackageMap::iterator findOrAddPackageLH(std::string name); void addClassLH(uint8_t kind, PackageMap::iterator pIter, - std::string& className, + const std::string& className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); void encodePackageIndication (framing::Buffer& buf, diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp index 4ccf8e68c9..4dcafbfcdd 100644 --- a/cpp/src/qpid/management/ManagementExchange.cpp +++ b/cpp/src/qpid/management/ManagementExchange.cpp @@ -56,8 +56,8 @@ bool ManagementExchange::bind (Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) { - managementAgent->clientAdded (); - return TopicExchange::bind (queue, routingKey, args); + managementAgent->clientAdded(routingKey); + return TopicExchange::bind(queue, routingKey, args); } void ManagementExchange::setManagmentAgent (ManagementBroker* agent) |