diff options
author | Kim van der Riet <kpvdr@apache.org> | 2008-04-04 18:14:42 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2008-04-04 18:14:42 +0000 |
commit | a2ea9d432dc5713dadd4c710a982cc466de3ea8b (patch) | |
tree | d7c955359c88b80b24c4f70146309a806511adda /cpp/src | |
parent | 2193d76646028d97b7bfff69335d4239954adbe5 (diff) | |
download | qpid-python-a2ea9d432dc5713dadd4c710a982cc466de3ea8b.tar.gz |
Patch from Ted Ross (see QPID-902): This patch contains the following improvements for management:\n1) Schema display cleaned up in the python mgmt-cli\n2) Locking added automatically to management object accessors (manual locking removed from broker/Queue.cpp)\n3) Schemas are now pre-registered with the management agent using a package initializer. This allows management consoles to get schema information for a class even if no instances of the class exist.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@644806 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 32 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 3 |
5 files changed, 43 insertions, 38 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index a183ce9d02..03036fb825 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -29,6 +29,7 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" +#include "qpid/management/PackageQpid.h" #include "qpid/management/ManagementExchange.h" #include "qpid/management/ArgsBrokerEcho.h" @@ -112,25 +113,13 @@ Broker::Broker(const Broker::Options& conf) : sessionManager(conf.ack), previewSessionManager(conf.ack) { - // Early-Initialize plugins - const Plugin::Plugins& plugins=Plugin::getPlugins(); - for (Plugin::Plugins::const_iterator i = plugins.begin(); - i != plugins.end(); - i++) - (*i)->earlyInitialize(*this); - - // If no plugin store module registered itself, set up the null store. - if (store == 0) - setStore (new NullMessageStore (false)); - - queues.setStore (store); - dtxManager.setStore (store); - if(conf.enableMgmt){ + QPID_LOG(info, "Management enabled"); ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), conf.mgmtPubInterval); managementAgent = ManagementAgent::getAgent (); managementAgent->setInterval (conf.mgmtPubInterval); + qpid::management::PackageQpid packageInitializer (managementAgent); System* system = new System (); systemObject = System::shared_ptr (system); @@ -157,6 +146,20 @@ Broker::Broker(const Broker::Options& conf) : exchanges.setParent (vhost); } + // Early-Initialize plugins + const Plugin::Plugins& plugins=Plugin::getPlugins(); + for (Plugin::Plugins::const_iterator i = plugins.begin(); + i != plugins.end(); + i++) + (*i)->earlyInitialize(*this); + + // If no plugin store module registered itself, set up the null store. + if (store == 0) + setStore (new NullMessageStore (false)); + + queues.setStore (store); + dtxManager.setStore (store); + exchanges.declare(empty, DirectExchange::typeName); // Default exchange. if (store != 0) { @@ -172,7 +175,6 @@ Broker::Broker(const Broker::Options& conf) : declareStandardExchange(amq_match, HeadersExchange::typeName); if(conf.enableMgmt) { - QPID_LOG(info, "Management enabled"); exchanges.declare(qpid_management, ManagementExchange::typeName); Exchange::shared_ptr mExchange = exchanges.get (qpid_management); Exchange::shared_ptr dExchange = exchanges.get (amq_direct); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index a405971805..24ed6825b4 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -111,7 +111,6 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); @@ -119,7 +118,6 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ } }else { if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); @@ -138,7 +136,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); @@ -157,7 +154,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); @@ -348,7 +344,6 @@ void Queue::consume(Consumer&, bool requestExclusive){ consumerCount++; if (mgmtObject.get() != 0){ - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_consumers (); } } @@ -359,7 +354,6 @@ void Queue::cancel(Consumer& c){ consumerCount--; if(exclusive) exclusive = false; if (mgmtObject.get() != 0){ - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->dec_consumers (); } } @@ -390,7 +384,6 @@ void Queue::pop(){ if (policy.get()) policy->dequeued(msg.payload->contentSize()); if (mgmtObject.get() != 0){ - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); mgmtObject->dec_msgDepth (); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index a5ed84fb32..d6a91ab2c1 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -114,6 +114,16 @@ void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange, dExchange = _dexchange; } +void ManagementAgent::RegisterClass (string packageName, + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + RWlock::ScopedWlock writeLock (userLock); + PackageMap::iterator pIter = FindOrAddPackage (packageName); + AddClassLocal (pIter, className, md5Sum, schemaCall); +} + void ManagementAgent::addObject (ManagementObject::shared_ptr object, uint64_t /*persistenceId*/, uint64_t /*idOffset*/) @@ -128,15 +138,6 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object, object->setObjectId (objectId); managementObjects[objectId] = object; - - // If we've already seen instances of this object type, we're done. - if (!object->firstInstance ()) - return; - - // This is the first object of this type that we've seen, update the schema - // inventory. - PackageMap::iterator pIter = FindOrAddPackage (object->getPackageName ()); - AddClassLocal (pIter, object); } ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) @@ -623,14 +624,16 @@ ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::st return result.first; } -void ManagementAgent::AddClassLocal (PackageMap::iterator pIter, - ManagementObject::shared_ptr object) +void ManagementAgent::AddClassLocal (PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; - key.name = object->getClassName (); - memcpy (&key.hash, object->getMd5Sum (), 16); + key.name = className; + memcpy (&key.hash, md5Sum, 16); ClassMap::iterator cIter = cMap.find (key); if (cIter != cMap.end ()) @@ -641,7 +644,7 @@ void ManagementAgent::AddClassLocal (PackageMap::iterator pIter, key.name); SchemaClass classInfo; - classInfo.writeSchemaCall = object->getWriteSchemaCall (); + classInfo.writeSchemaCall = schemaCall; cMap[key] = classInfo; // TODO: Publish a class-indication message diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index f2cd0373c0..ac665eb50d 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -53,6 +53,10 @@ class ManagementAgent void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (broker::Exchange::shared_ptr mgmtExchange, broker::Exchange::shared_ptr directExchange); + void RegisterClass (std::string packageName, + std::string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); void addObject (ManagementObject::shared_ptr object, uint64_t persistenceId = 0, uint64_t idOffset = 10); @@ -160,7 +164,9 @@ class ManagementAgent PackageMap::iterator FindOrAddPackage (std::string name); void AddClassLocal (PackageMap::iterator pIter, - ManagementObject::shared_ptr object); + std::string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); void EncodePackageIndication (qpid::framing::Buffer& buf, PackageMap::iterator pIter); void EncodeClassIndication (qpid::framing::Buffer& buf, diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 23042ad988..ca8a0fd558 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -24,6 +24,7 @@ #include "Manageable.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Mutex.h" #include <qpid/framing/Buffer.h> #include <boost/shared_ptr.hpp> #include <map> @@ -44,6 +45,7 @@ class ManagementObject bool instChanged; bool deleted; Manageable* coreObject; + sys::RWlock accessLock; static const uint8_t TYPE_U8 = 1; static const uint8_t TYPE_U16 = 2; @@ -84,7 +86,6 @@ class ManagementObject virtual ~ManagementObject () {} virtual writeSchemaCall_t getWriteSchemaCall (void) = 0; - virtual bool firstInstance (void) = 0; virtual void writeConfig (qpid::framing::Buffer& buf) = 0; virtual void writeInstrumentation (qpid::framing::Buffer& buf, bool skipHeaders = false) = 0; |