diff options
author | Kim van der Riet <kpvdr@apache.org> | 2008-04-04 18:14:42 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2008-04-04 18:14:42 +0000 |
commit | a2ea9d432dc5713dadd4c710a982cc466de3ea8b (patch) | |
tree | d7c955359c88b80b24c4f70146309a806511adda /cpp | |
parent | 2193d76646028d97b7bfff69335d4239954adbe5 (diff) | |
download | qpid-python-a2ea9d432dc5713dadd4c710a982cc466de3ea8b.tar.gz |
Patch from Ted Ross (see QPID-902): This patch contains the following improvements for management:\n1) Schema display cleaned up in the python mgmt-cli\n2) Locking added automatically to management object accessors (manual locking removed from broker/Queue.cpp)\n3) Schemas are now pre-registered with the management agent using a package initializer. This allows management consoles to get schema information for a class even if no instances of the class exist.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@644806 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rwxr-xr-x | cpp/managementgen/generate.py | 18 | ||||
-rwxr-xr-x | cpp/managementgen/main.py | 8 | ||||
-rwxr-xr-x | cpp/managementgen/schema.py | 31 | ||||
-rw-r--r-- | cpp/managementgen/templates/Class.cpp | 15 | ||||
-rw-r--r-- | cpp/managementgen/templates/Class.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 32 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 3 |
10 files changed, 99 insertions, 59 deletions
diff --git a/cpp/managementgen/generate.py b/cpp/managementgen/generate.py index 4c042bd3f6..e1c01de9b0 100755 --- a/cpp/managementgen/generate.py +++ b/cpp/managementgen/generate.py @@ -187,7 +187,15 @@ class Generator: pass os.rename (tempFile, target) - print "Generated:", target + print "Generated:", target + + def targetPackageFile (self, schema, templateFile): + dot = templateFile.find(".") + if dot == -1: + raise ValueError ("Invalid template file name %s" % templateFile) + extension = templateFile[dot:len (templateFile)] + path = self.dest + "Package" + schema.getPackageName().capitalize() + extension + return path def targetClassFile (self, _class, templateFile): dot = templateFile.find(".") @@ -244,6 +252,14 @@ class Generator: stream = template.expand (method) self.writeIfChanged (stream, target, force) + def makePackageFile (self, templateFile, schema, force=False): + """ Generate a package-specific file """ + template = Template (self.input + templateFile, self) + self.templateFiles.append (templateFile) + target = self.targetPackageFile (schema, templateFile) + stream = template.expand (schema) + self.writeIfChanged (stream, target, force) + def makeSingleFile (self, templateFile, target, force=False): """ Generate a single expanded template """ makefile = Makefile (self.filelists, self.templateFiles) diff --git a/cpp/managementgen/main.py b/cpp/managementgen/main.py index 677c7321ae..87ef3d5298 100755 --- a/cpp/managementgen/main.py +++ b/cpp/managementgen/main.py @@ -48,9 +48,11 @@ if opts.include_prefix == ".": gen = Generator (outdir, templatedir) schema = PackageSchema (typefile, schemafile, opts) -gen.makeClassFiles ("Class.h", schema) -gen.makeClassFiles ("Class.cpp", schema) -gen.makeMethodFiles ("Args.h", schema) +gen.makeClassFiles ("Class.h", schema) +gen.makeClassFiles ("Class.cpp", schema) +gen.makeMethodFiles ("Args.h", schema) +gen.makePackageFile ("Package.h", schema) +gen.makePackageFile ("Package.cpp", schema) if opts.makefile != None: gen.makeSingleFile ("Makefile.mk", opts.makefile, force=True) diff --git a/cpp/managementgen/schema.py b/cpp/managementgen/schema.py index fd76ba9112..44fc091372 100755 --- a/cpp/managementgen/schema.py +++ b/cpp/managementgen/schema.py @@ -78,6 +78,7 @@ class SchemaType: def genAccessor (self, stream, varName, changeFlag = None): if self.accessor == "direct": stream.write (" inline void set_" + varName + " (" + self.cpp + " val){\n"); + stream.write (" sys::RWlock::ScopedWlock writeLock (accessLock);\n") if self.style != "mma": stream.write (" " + varName + " = val;\n"); if self.style == "wm": @@ -97,6 +98,7 @@ class SchemaType: stream.write (" }\n"); elif self.accessor == "counter": stream.write (" inline void inc_" + varName + " (" + self.cpp + " by = 1){\n"); + stream.write (" sys::RWlock::ScopedWlock writeLock (accessLock);\n") stream.write (" " + varName + " += by;\n") if self.style == "wm": stream.write (" if (" + varName + "High < " + varName + ")\n") @@ -105,6 +107,7 @@ class SchemaType: stream.write (" " + changeFlag + " = true;\n") stream.write (" }\n"); stream.write (" inline void dec_" + varName + " (" + self.cpp + " by = 1){\n"); + stream.write (" sys::RWlock::ScopedWlock writeLock (accessLock);\n") stream.write (" " + varName + " -= by;\n") if self.style == "wm": stream.write (" if (" + varName + "Low > " + varName + ")\n") @@ -796,6 +799,9 @@ class SchemaClass: def genNameLower (self, stream, variables): stream.write (self.name.lower ()) + def genNamePackageCap (self, stream, variables): + stream.write (self.packageName.capitalize ()) + def genNamePackageLower (self, stream, variables): stream.write (self.packageName.lower ()) @@ -867,3 +873,28 @@ class PackageSchema: def getClasses (self): return self.classes + + def genPackageNameUpper (self, stream, variables): + stream.write (self.packageName.upper ()) + + def genPackageNameCap (self, stream, variables): + stream.write (self.packageName.capitalize ()) + + def genClassIncludes (self, stream, variables): + for _class in self.classes: + stream.write ("#include \"qpid/management/") + _class.genNameCap (stream, variables) + stream.write (".h\"\n") + + def genClassRegisters (self, stream, variables): + for _class in self.classes: + stream.write ("agent->RegisterClass (") + _class.genNameCap (stream, variables) + stream.write ("::packageName, ") + _class.genNameCap (stream, variables) + stream.write ("::className, ") + _class.genNameCap (stream, variables) + stream.write ("::md5Sum, ") + _class.genNameCap (stream, variables) + stream.write ("::writeSchema);\n") + diff --git a/cpp/managementgen/templates/Class.cpp b/cpp/managementgen/templates/Class.cpp index 3c3dfff5a2..5862685670 100644 --- a/cpp/managementgen/templates/Class.cpp +++ b/cpp/managementgen/templates/Class.cpp @@ -35,7 +35,6 @@ string /*MGEN:Class.NameCap*/::packageName = string ("/*MGEN:Class.NamePackage string /*MGEN:Class.NameCap*/::className = string ("/*MGEN:Class.NameLower*/"); uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] = {/*MGEN:Class.SchemaMD5*/}; -bool /*MGEN:Class.NameCap*/::firstInst = true; /*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) : ManagementObject(_core) @@ -63,18 +62,6 @@ namespace { const string DEFAULT("default"); } -bool /*MGEN:Class.NameCap*/::firstInstance (void) -{ - Mutex::ScopedLock alock(accessorLock); - if (firstInst) - { - firstInst = false; - return true; - } - - return false; -} - void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) { FieldTable ft; @@ -100,6 +87,7 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) void /*MGEN:Class.NameCap*/::writeConfig (Buffer& buf) { + sys::RWlock::ScopedRlock readLock (accessLock); configChanged = false; writeTimestamps (buf); @@ -108,6 +96,7 @@ void /*MGEN:Class.NameCap*/::writeConfig (Buffer& buf) void /*MGEN:Class.NameCap*/::writeInstrumentation (Buffer& buf, bool skipHeaders) { + sys::RWlock::ScopedWlock writeLock (accessLock); instChanged = false; if (!skipHeaders) diff --git a/cpp/managementgen/templates/Class.h b/cpp/managementgen/templates/Class.h index 047d7cc950..d95a06479e 100644 --- a/cpp/managementgen/templates/Class.h +++ b/cpp/managementgen/templates/Class.h @@ -23,7 +23,6 @@ /*MGEN:Root.Disclaimer*/ -#include "qpid/sys/Mutex.h" #include "qpid/management/ManagementObject.h" #include "qpid/framing/Uuid.h" @@ -37,7 +36,6 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject static std::string packageName; static std::string className; static uint8_t md5Sum[16]; - static bool firstInst; // Configuration Elements /*MGEN:Class.ConfigDeclarations*/ @@ -52,13 +50,12 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf); writeSchemaCall_t getWriteSchemaCall (void) { return writeSchema; } - bool firstInstance (void); /*MGEN:Class.InstChangedStub*/ public: + friend class Package/*MGEN:Class.NamePackageCap*/; typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr; - qpid::sys::Mutex accessorLock; /*MGEN:Class.NameCap*/ (Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/); ~/*MGEN:Class.NameCap*/ (void); diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index a183ce9d02..03036fb825 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -29,6 +29,7 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" +#include "qpid/management/PackageQpid.h" #include "qpid/management/ManagementExchange.h" #include "qpid/management/ArgsBrokerEcho.h" @@ -112,25 +113,13 @@ Broker::Broker(const Broker::Options& conf) : sessionManager(conf.ack), previewSessionManager(conf.ack) { - // Early-Initialize plugins - const Plugin::Plugins& plugins=Plugin::getPlugins(); - for (Plugin::Plugins::const_iterator i = plugins.begin(); - i != plugins.end(); - i++) - (*i)->earlyInitialize(*this); - - // If no plugin store module registered itself, set up the null store. - if (store == 0) - setStore (new NullMessageStore (false)); - - queues.setStore (store); - dtxManager.setStore (store); - if(conf.enableMgmt){ + QPID_LOG(info, "Management enabled"); ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), conf.mgmtPubInterval); managementAgent = ManagementAgent::getAgent (); managementAgent->setInterval (conf.mgmtPubInterval); + qpid::management::PackageQpid packageInitializer (managementAgent); System* system = new System (); systemObject = System::shared_ptr (system); @@ -157,6 +146,20 @@ Broker::Broker(const Broker::Options& conf) : exchanges.setParent (vhost); } + // Early-Initialize plugins + const Plugin::Plugins& plugins=Plugin::getPlugins(); + for (Plugin::Plugins::const_iterator i = plugins.begin(); + i != plugins.end(); + i++) + (*i)->earlyInitialize(*this); + + // If no plugin store module registered itself, set up the null store. + if (store == 0) + setStore (new NullMessageStore (false)); + + queues.setStore (store); + dtxManager.setStore (store); + exchanges.declare(empty, DirectExchange::typeName); // Default exchange. if (store != 0) { @@ -172,7 +175,6 @@ Broker::Broker(const Broker::Options& conf) : declareStandardExchange(amq_match, HeadersExchange::typeName); if(conf.enableMgmt) { - QPID_LOG(info, "Management enabled"); exchanges.declare(qpid_management, ManagementExchange::typeName); Exchange::shared_ptr mExchange = exchanges.get (qpid_management); Exchange::shared_ptr dExchange = exchanges.get (amq_direct); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index a405971805..24ed6825b4 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -111,7 +111,6 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); @@ -119,7 +118,6 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ } }else { if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); @@ -138,7 +136,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); @@ -157,7 +154,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); @@ -348,7 +344,6 @@ void Queue::consume(Consumer&, bool requestExclusive){ consumerCount++; if (mgmtObject.get() != 0){ - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_consumers (); } } @@ -359,7 +354,6 @@ void Queue::cancel(Consumer& c){ consumerCount--; if(exclusive) exclusive = false; if (mgmtObject.get() != 0){ - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->dec_consumers (); } } @@ -390,7 +384,6 @@ void Queue::pop(){ if (policy.get()) policy->dequeued(msg.payload->contentSize()); if (mgmtObject.get() != 0){ - Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); mgmtObject->dec_msgDepth (); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index a5ed84fb32..d6a91ab2c1 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -114,6 +114,16 @@ void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange, dExchange = _dexchange; } +void ManagementAgent::RegisterClass (string packageName, + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + RWlock::ScopedWlock writeLock (userLock); + PackageMap::iterator pIter = FindOrAddPackage (packageName); + AddClassLocal (pIter, className, md5Sum, schemaCall); +} + void ManagementAgent::addObject (ManagementObject::shared_ptr object, uint64_t /*persistenceId*/, uint64_t /*idOffset*/) @@ -128,15 +138,6 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object, object->setObjectId (objectId); managementObjects[objectId] = object; - - // If we've already seen instances of this object type, we're done. - if (!object->firstInstance ()) - return; - - // This is the first object of this type that we've seen, update the schema - // inventory. - PackageMap::iterator pIter = FindOrAddPackage (object->getPackageName ()); - AddClassLocal (pIter, object); } ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) @@ -623,14 +624,16 @@ ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::st return result.first; } -void ManagementAgent::AddClassLocal (PackageMap::iterator pIter, - ManagementObject::shared_ptr object) +void ManagementAgent::AddClassLocal (PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; - key.name = object->getClassName (); - memcpy (&key.hash, object->getMd5Sum (), 16); + key.name = className; + memcpy (&key.hash, md5Sum, 16); ClassMap::iterator cIter = cMap.find (key); if (cIter != cMap.end ()) @@ -641,7 +644,7 @@ void ManagementAgent::AddClassLocal (PackageMap::iterator pIter, key.name); SchemaClass classInfo; - classInfo.writeSchemaCall = object->getWriteSchemaCall (); + classInfo.writeSchemaCall = schemaCall; cMap[key] = classInfo; // TODO: Publish a class-indication message diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index f2cd0373c0..ac665eb50d 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -53,6 +53,10 @@ class ManagementAgent void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (broker::Exchange::shared_ptr mgmtExchange, broker::Exchange::shared_ptr directExchange); + void RegisterClass (std::string packageName, + std::string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); void addObject (ManagementObject::shared_ptr object, uint64_t persistenceId = 0, uint64_t idOffset = 10); @@ -160,7 +164,9 @@ class ManagementAgent PackageMap::iterator FindOrAddPackage (std::string name); void AddClassLocal (PackageMap::iterator pIter, - ManagementObject::shared_ptr object); + std::string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); void EncodePackageIndication (qpid::framing::Buffer& buf, PackageMap::iterator pIter); void EncodeClassIndication (qpid::framing::Buffer& buf, diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 23042ad988..ca8a0fd558 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -24,6 +24,7 @@ #include "Manageable.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Mutex.h" #include <qpid/framing/Buffer.h> #include <boost/shared_ptr.hpp> #include <map> @@ -44,6 +45,7 @@ class ManagementObject bool instChanged; bool deleted; Manageable* coreObject; + sys::RWlock accessLock; static const uint8_t TYPE_U8 = 1; static const uint8_t TYPE_U16 = 2; @@ -84,7 +86,6 @@ class ManagementObject virtual ~ManagementObject () {} virtual writeSchemaCall_t getWriteSchemaCall (void) = 0; - virtual bool firstInstance (void) = 0; virtual void writeConfig (qpid::framing::Buffer& buf) = 0; virtual void writeInstrumentation (qpid::framing::Buffer& buf, bool skipHeaders = false) = 0; |