diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-02-28 18:55:21 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-02-28 18:55:21 +0000 |
commit | ac3f850123c903f00c163d6d2dbad22d98aec7a2 (patch) | |
tree | 2e622a3e9349a9062454d16bf4bca83a5a3e9d90 /cpp | |
parent | 1820dd421a096ed184a08deee9512e809312fed2 (diff) | |
download | qpid-python-ac3f850123c903f00c163d6d2dbad22d98aec7a2.tar.gz |
QPID-820 from tross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@632087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rwxr-xr-x | cpp/managementgen/main.py | 8 | ||||
-rwxr-xr-x | cpp/managementgen/schema.py | 85 | ||||
-rw-r--r-- | cpp/managementgen/templates/Class.cpp | 31 | ||||
-rw-r--r-- | cpp/managementgen/templates/Class.h | 19 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/System.cpp | 48 | ||||
-rw-r--r-- | cpp/src/qpid/broker/System.h | 51 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Buffer.cpp | 48 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Buffer.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/management/Manageable.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 292 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 84 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementExchange.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementExchange.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 22 |
18 files changed, 639 insertions, 100 deletions
diff --git a/cpp/managementgen/main.py b/cpp/managementgen/main.py index de8ce4cbe6..677c7321ae 100755 --- a/cpp/managementgen/main.py +++ b/cpp/managementgen/main.py @@ -28,6 +28,9 @@ usage = "usage: %prog [options] schema-document type-document template-director parser = OptionParser (usage=usage) parser.add_option ("-m", "--makefile", dest="makefile", metavar="FILE", help="Makefile fragment") +parser.add_option ("-i", "--include-prefix", dest="include_prefix", metavar="PATH", + default="qpid/management/", + help="Prefix for #include of generated headers in generated source, default: qpid/management/") (opts, args) = parser.parse_args () @@ -39,8 +42,11 @@ typefile = args[1] templatedir = args[2] outdir = args[3] +if opts.include_prefix == ".": + opts.include_prefix = None + gen = Generator (outdir, templatedir) -schema = PackageSchema (typefile, schemafile) +schema = PackageSchema (typefile, schemafile, opts) gen.makeClassFiles ("Class.h", schema) gen.makeClassFiles ("Class.cpp", schema) diff --git a/cpp/managementgen/schema.py b/cpp/managementgen/schema.py index a459db7a47..fd76ba9112 100755 --- a/cpp/managementgen/schema.py +++ b/cpp/managementgen/schema.py @@ -21,6 +21,7 @@ from xml.dom.minidom import parse, parseString, Node from cStringIO import StringIO +import md5 #===================================================================================== # @@ -575,15 +576,18 @@ class SchemaEvent: def getArgCount (self): return len (self.args) -#===================================================================================== -# -#===================================================================================== + class SchemaClass: - def __init__ (self, node, typespec): + def __init__ (self, package, node, typespec, fragments, options): + self.packageName = package self.configElements = [] self.instElements = [] self.methods = [] self.events = [] + self.options = options + self.md5Sum = md5.new () + + self.hash (node) attrs = node.attributes self.name = attrs['name'].nodeValue @@ -607,9 +611,40 @@ class SchemaClass: sub = SchemaEvent (self, child, typespec) self.events.append (sub) + elif child.nodeName == 'group': + self.expandFragment (child, fragments) + else: raise ValueError ("Unknown class tag '%s'" % child.nodeName) + def hash (self, node): + attrs = node.attributes + self.md5Sum.update (node.nodeName) + + for idx in range (attrs.length): + self.md5Sum.update (attrs.item(idx).nodeName) + self.md5Sum.update (attrs.item(idx).nodeValue) + + for child in node.childNodes: + if child.nodeType == Node.ELEMENT_NODE: + self.hash (child) + + def expandFragment (self, node, fragments): + attrs = node.attributes + name = attrs['name'].nodeValue + for fragment in fragments: + if fragment.name == name: + for config in fragment.configElements: + self.configElements.append (config) + for inst in fragment.instElements: + self.instElements.append (inst) + for method in fragment.methods: + self.methods.append (method) + for event in fragment.events: + self.events.append (event) + return + raise ValueError ("Undefined group '%s'" % name) + def getName (self): return self.name @@ -644,13 +679,9 @@ class SchemaClass: def genConstructorArgs (self, stream, variables): # Constructor args are config elements with read-create access result = "" - first = 1 for element in self.configElements: if element.isConstructorArg (): - if first == 1: - first = 0 - else: - stream.write (", ") + stream.write (", ") element.genFormalParam (stream) def genConstructorInits (self, stream, variables): @@ -715,8 +746,8 @@ class SchemaClass: def genMethodArgIncludes (self, stream, variables): for method in self.methods: if method.getArgCount () > 0: - stream.write ("#include \"qpid/management/Args" +\ - method.getFullName () + ".h\"\n") + stream.write ("#include \"" + (self.options.include_prefix or "") +\ + "Args" + method.getFullName () + ".h\"\n") def genMethodCount (self, stream, variables): stream.write ("%d" % len (self.methods)) @@ -765,13 +796,16 @@ class SchemaClass: def genNameLower (self, stream, variables): stream.write (self.name.lower ()) + def genNamePackageLower (self, stream, variables): + stream.write (self.packageName.lower ()) + def genNameUpper (self, stream, variables): stream.write (self.name.upper ()) def genParentArg (self, stream, variables): for config in self.configElements: if config.isParentRef == 1: - stream.write (" _parent") + stream.write (", Manageable* _parent") return def genParentRefAssignment (self, stream, variables): @@ -781,6 +815,13 @@ class SchemaClass: " = _parent->GetManagementObject ()->getObjectId ();") return + def genSchemaMD5 (self, stream, variables): + sum = self.md5Sum.digest () + for idx in range (len (sum)): + if idx != 0: + stream.write (",") + stream.write (hex (ord (sum[idx]))) + def genWriteConfig (self, stream, variables): for config in self.configElements: config.genWrite (stream); @@ -790,14 +831,13 @@ class SchemaClass: inst.genWrite (stream); -#===================================================================================== -# -#===================================================================================== + class PackageSchema: - def __init__ (self, typefile, schemafile): + def __init__ (self, typefile, schemafile, options): - self.classes = [] - self.typespec = TypeSpec (typefile) + self.classes = [] + self.fragments = [] + self.typespec = TypeSpec (typefile) dom = parse (schemafile) document = dom.documentElement @@ -810,8 +850,15 @@ class PackageSchema: for child in children: if child.nodeType == Node.ELEMENT_NODE: if child.nodeName == 'class': - cls = SchemaClass (child, self.typespec) + cls = SchemaClass (self.packageName, child, self.typespec, + self.fragments, options) self.classes.append (cls) + + elif child.nodeName == 'group': + cls = SchemaClass (self.packageName, child, self.typespec, + self.fragments, options) + self.fragments.append (cls) + else: raise ValueError ("Unknown schema tag '%s'" % child.nodeName) diff --git a/cpp/managementgen/templates/Class.cpp b/cpp/managementgen/templates/Class.cpp index d87d11f767..2a3f71e262 100644 --- a/cpp/managementgen/templates/Class.cpp +++ b/cpp/managementgen/templates/Class.cpp @@ -31,11 +31,14 @@ using namespace qpid::sys; using namespace qpid::framing; using std::string; -bool /*MGEN:Class.NameCap*/::schemaNeeded = true; - -/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (Manageable* _core, Manageable*/*MGEN:Class.ParentArg*/, - /*MGEN:Class.ConstructorArgs*/) : - ManagementObject(_core, "/*MGEN:Class.NameLower*/") +string /*MGEN:Class.NameCap*/::packageName = string ("/*MGEN:Class.NamePackageLower*/"); +string /*MGEN:Class.NameCap*/::className = string ("/*MGEN:Class.NameLower*/"); +uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] = + {/*MGEN:Class.SchemaMD5*/}; +bool /*MGEN:Class.NameCap*/::firstInst = true; + +/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) : + ManagementObject(_core) /*MGEN:Class.ConstructorInits*/ { /*MGEN:Class.ParentRefAssignment*/ @@ -60,14 +63,26 @@ namespace { const string DEFAULT("default"); } +bool /*MGEN:Class.NameCap*/::firstInstance (void) +{ + Mutex::ScopedLock alock(accessorLock); + if (firstInst) + { + firstInst = false; + return true; + } + + return false; +} + void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) { FieldTable ft; - schemaNeeded = false; - // Schema class header: - buf.putShortString (className); // Class Name + buf.putShortString (packageName); // Package Name + buf.putShortString (className); // Class Name + buf.putBin128 (md5Sum); // Schema Hash buf.putShort (/*MGEN:Class.ConfigCount*/); // Config Element Count buf.putShort (/*MGEN:Class.InstCount*/); // Inst Element Count buf.putShort (/*MGEN:Class.MethodCount*/); // Method Count diff --git a/cpp/managementgen/templates/Class.h b/cpp/managementgen/templates/Class.h index ba6a1183e2..82fac00d47 100644 --- a/cpp/managementgen/templates/Class.h +++ b/cpp/managementgen/templates/Class.h @@ -33,22 +33,24 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject { private: - static bool schemaNeeded; + static std::string packageName; + static std::string className; + static uint8_t md5Sum[16]; + static bool firstInst; // Configuration Elements /*MGEN:Class.ConfigDeclarations*/ // Instrumentation Elements /*MGEN:Class.InstDeclarations*/ // Private Methods - std::string getObjectName (void) { return "/*MGEN:Class.NameLower*/"; } - void writeSchema (qpid::framing::Buffer& buf); + static void writeSchema (qpid::framing::Buffer& buf); void writeConfig (qpid::framing::Buffer& buf); void writeInstrumentation (qpid::framing::Buffer& buf); - bool getSchemaNeeded (void) { return schemaNeeded; } - void setSchemaNeeded (void) { schemaNeeded = true; } void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf); + writeSchemaCall_t getWriteSchemaCall (void) { return writeSchema; } + bool firstInstance (void); /*MGEN:Class.InstChangedStub*/ public: @@ -56,10 +58,13 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr; qpid::sys::Mutex accessorLock; - /*MGEN:Class.NameCap*/ (Manageable* coreObject, Manageable* parentObject, - /*MGEN:Class.ConstructorArgs*/); + /*MGEN:Class.NameCap*/ (Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/); ~/*MGEN:Class.NameCap*/ (void); + std::string getPackageName (void) { return packageName; } + std::string getClassName (void) { return className; } + uint8_t* getMd5Sum (void) { return md5Sum; } + // Method IDs /*MGEN:Class.MethodIdDeclarations*/ // Accessor Methods diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index e3b95e045f..becccb4224 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -213,6 +213,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SessionContext.h \ qpid/broker/SessionHandler.cpp \ qpid/broker/SemanticHandler.cpp \ + qpid/broker/System.cpp \ qpid/broker/Timer.cpp \ qpid/broker/TopicExchange.cpp \ qpid/broker/TxAck.cpp \ @@ -331,6 +332,7 @@ nobase_include_HEADERS = \ qpid/broker/RecoveryManagerImpl.h \ qpid/broker/SemanticHandler.h \ qpid/broker/SessionManager.h \ + qpid/broker/System.h \ qpid/broker/Timer.h \ qpid/broker/TopicExchange.h \ qpid/broker/TransactionalStore.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 9bfa868d9c..8b70831cf7 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -131,13 +131,18 @@ Broker::Broker(const Broker::Options& conf) : managementAgent = ManagementAgent::getAgent (); managementAgent->setInterval (conf.mgmtPubInterval); - mgmtObject = management::Broker::shared_ptr (new management::Broker (this, 0, 0, conf.port)); + System* system = new System (); + systemObject = System::shared_ptr (system); + + mgmtObject = management::Broker::shared_ptr (new management::Broker (this, system, conf.port)); mgmtObject->set_workerThreads (conf.workerThreads); mgmtObject->set_maxConns (conf.maxConnections); mgmtObject->set_connBacklog (conf.connectionBacklog); mgmtObject->set_stagingThreshold (conf.stagingThreshold); mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval); mgmtObject->set_version (PACKAGE_VERSION); + mgmtObject->set_dataDirEnabled (dataDir.isEnabled ()); + mgmtObject->set_dataDir (dataDir.getPath ()); managementAgent->addObject (mgmtObject, 1, 0); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 153eabc6b3..9e5191825d 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -32,6 +32,7 @@ #include "SessionManager.h" #include "PreviewSessionManager.h" #include "Vhost.h" +#include "System.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" #include "qpid/management/Broker.h" @@ -142,6 +143,7 @@ class Broker : public sys::Runnable, public Plugin::Target, management::ManagementAgent::shared_ptr managementAgent; management::Broker::shared_ptr mgmtObject; Vhost::shared_ptr vhostObject; + System::shared_ptr systemObject; void declareStandardExchange(const std::string& name, const std::string& type); }; diff --git a/cpp/src/qpid/broker/System.cpp b/cpp/src/qpid/broker/System.cpp new file mode 100644 index 0000000000..87d5185b97 --- /dev/null +++ b/cpp/src/qpid/broker/System.cpp @@ -0,0 +1,48 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "System.h" +#include "qpid/management/ManagementAgent.h" +#include <sys/utsname.h> + +using namespace qpid::broker; +using qpid::management::ManagementAgent; + +System::System () +{ + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + + if (agent.get () != 0) + { + mgmtObject = management::System::shared_ptr + (new management::System (this, "host")); + struct utsname _uname; + if (uname (&_uname) == 0) + { + mgmtObject->set_osName (std::string (_uname.sysname)); + mgmtObject->set_nodeName (std::string (_uname.nodename)); + mgmtObject->set_release (std::string (_uname.release)); + mgmtObject->set_version (std::string (_uname.version)); + mgmtObject->set_machine (std::string (_uname.machine)); + } + + agent->addObject (mgmtObject, 3, 0); + } +} + diff --git a/cpp/src/qpid/broker/System.h b/cpp/src/qpid/broker/System.h new file mode 100644 index 0000000000..a1a710f2b2 --- /dev/null +++ b/cpp/src/qpid/broker/System.h @@ -0,0 +1,51 @@ +#ifndef _BrokerSystem_ +#define _BrokerSystem_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/management/Manageable.h" +#include "qpid/management/System.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +class System : public management::Manageable +{ + private: + + management::System::shared_ptr mgmtObject; + + public: + + typedef boost::shared_ptr<System> shared_ptr; + + System (); + + management::ManagementObject::shared_ptr GetManagementObject (void) const + { return mgmtObject; } + + management::Manageable::status_t ManagementMethod (uint32_t, management::Args&) + { return management::Manageable::STATUS_OK; } +}; + +}} + +#endif /*!_BrokerSystem_*/ diff --git a/cpp/src/qpid/framing/Buffer.cpp b/cpp/src/qpid/framing/Buffer.cpp index 7eadf377b9..c0cd210042 100644 --- a/cpp/src/qpid/framing/Buffer.cpp +++ b/cpp/src/qpid/framing/Buffer.cpp @@ -74,6 +74,31 @@ void Buffer::putLongLong(uint64_t i){ putLong(lo); } +void Buffer::putFloat(float f){ + union { + uint32_t i; + float f; + } val; + + val.f = f; + putLong (val.i); +} + +void Buffer::putDouble(double f){ + union { + uint64_t i; + double f; + } val; + + val.f = f; + putLongLong (val.i); +} + +void Buffer::putBin128(uint8_t* b){ + memcpy (data + position, b, 16); + position += 16; +} + uint8_t Buffer::getOctet(){ return (uint8_t) data[position++]; } @@ -104,6 +129,24 @@ uint64_t Buffer::getLongLong(){ return hi | lo; } +float Buffer::getFloat(){ + union { + uint32_t i; + float f; + } val; + val.i = getLong(); + return val.f; +} + +double Buffer::getDouble(){ + union { + uint64_t i; + double f; + } val; + val.i = getLongLong(); + return val.f; +} + template <> uint64_t Buffer::getUInt<1>() { return getOctet(); @@ -172,6 +215,11 @@ void Buffer::getLongString(string& s){ position += len; } +void Buffer::getBin128(uint8_t* b){ + memcpy (b, data + position, 16); + position += 16; +} + void Buffer::putRawData(const string& s){ uint32_t len = s.length(); s.copy(data + position, len); diff --git a/cpp/src/qpid/framing/Buffer.h b/cpp/src/qpid/framing/Buffer.h index 5ab897d351..d0ca41f82b 100644 --- a/cpp/src/qpid/framing/Buffer.h +++ b/cpp/src/qpid/framing/Buffer.h @@ -57,11 +57,16 @@ public: void putShort(uint16_t i); void putLong(uint32_t i); void putLongLong(uint64_t i); + void putFloat(float f); + void putDouble(double f); + void putBin128(uint8_t* b); - uint8_t getOctet(); + uint8_t getOctet(); uint16_t getShort(); uint32_t getLong(); uint64_t getLongLong(); + float getFloat(); + double getDouble(); template <int n> uint64_t getUInt(); @@ -73,6 +78,7 @@ public: void putLongString(const string& s); void getShortString(string& s); void getLongString(string& s); + void getBin128(uint8_t* b); void putRawData(const string& s); void getRawData(string& s, uint32_t size); diff --git a/cpp/src/qpid/management/Manageable.h b/cpp/src/qpid/management/Manageable.h index 155b71da54..1fb890c8c6 100644 --- a/cpp/src/qpid/management/Manageable.h +++ b/cpp/src/qpid/management/Manageable.h @@ -23,6 +23,7 @@ #include "ManagementObject.h" #include "Args.h" #include <string> +#include <boost/shared_ptr.hpp> namespace qpid { namespace management { diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 709f2a0ecd..bdbabbaf47 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -24,13 +24,13 @@ #include "qpid/log/Statement.h" #include <qpid/broker/Message.h> #include <qpid/broker/MessageDelivery.h> -#include <qpid/framing/AMQFrame.h> #include <list> using namespace qpid::framing; using namespace qpid::management; using namespace qpid::broker; using namespace qpid::sys; +using namespace std; ManagementAgent::shared_ptr ManagementAgent::agent; bool ManagementAgent::enabled = 0; @@ -39,6 +39,7 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) { timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ())); + nextRemotePrefix = 101; } ManagementAgent::~ManagementAgent () {} @@ -87,6 +88,15 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object, object->setObjectId (objectId); managementObjects[objectId] = object; + + // If we've already seen instances of this object type, we're done. + if (!object->firstInstance ()) + return; + + // This is the first object of this type that we've seen, update the schema + // inventory. + PackageMap::iterator pIter = FindOrAddPackage (object->getPackageName ()); + AddClassLocal (pIter, object); } ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) @@ -102,15 +112,12 @@ void ManagementAgent::Periodic::fire () void ManagementAgent::clientAdded (void) { - RWlock::ScopedRlock readLock (userLock); - for (ManagementObjectMap::iterator iter = managementObjects.begin (); iter != managementObjects.end (); iter++) { ManagementObject::shared_ptr object = iter->second; - object->setAllChanged (); - object->setSchemaNeeded (); + object->setAllChanged (); } } @@ -142,6 +149,9 @@ void ManagementAgent::SendBuffer (Buffer& buf, broker::Exchange::shared_ptr exchange, string routingKey) { + if (exchange.get() == 0) + return; + intrusive_ptr<Message> msg (new Message ()); AMQFrame method (in_place<MessageTransferBody>( ProtocolVersion(), 0, exchange->getName (), 0, 0)); @@ -170,7 +180,6 @@ void ManagementAgent::SendBuffer (Buffer& buf, void ManagementAgent::PeriodicProcessing (void) { #define BUFSIZE 65536 -#define THRESHOLD 16384 RWlock::ScopedWlock writeLock (userLock); char msgChars[BUFSIZE]; uint32_t contentSize; @@ -186,18 +195,6 @@ void ManagementAgent::PeriodicProcessing (void) { ManagementObject::shared_ptr object = iter->second; - if (object->getSchemaNeeded ()) - { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'S'); - object->writeSchema (msgBuffer); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt.schema." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); - } - if (object->getConfigChanged () || object->isDeleted ()) { Buffer msgBuffer (msgChars, BUFSIZE); @@ -239,17 +236,30 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, const string& routingKey, const FieldTable* /*args*/) { - size_t pos, start; + RWlock::ScopedRlock readLock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - uint32_t contentSize; - if (routingKey.compare (0, 7, "method.") != 0) + if (routingKey.compare (0, 13, "agent.method.") == 0) + dispatchMethod (msg, routingKey, 13); + + else if (routingKey.length () == 5 && + routingKey.compare (0, 5, "agent") == 0) + dispatchAgentCommand (msg); + + else { QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); return; } +} + +void ManagementAgent::dispatchMethod (Message& msg, + const string& routingKey, + size_t first) +{ + size_t pos, start = first; + uint32_t contentSize; - start = 7; if (routingKey.length () == start) { QPID_LOG (debug, "Missing package-name in routing key: " << routingKey); @@ -279,13 +289,11 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, string methodName = routingKey.substr (start, routingKey.length () - start); contentSize = msg.encodedContentSize (); - if (contentSize < 8 || contentSize > 65536) + if (contentSize < 8 || contentSize > MA_BUFFER_SIZE) return; - char *inMem = new char[contentSize]; - char outMem[4096]; // TODO Fix This - Buffer inBuffer (inMem, contentSize); - Buffer outBuffer (outMem, 4096); + Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; uint8_t opcode, unused; @@ -321,7 +329,7 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, return; } - EncodeHeader (outBuffer, 'R'); + EncodeHeader (outBuffer, 'm'); outBuffer.putLong (methodId); ManagementObjectMap::iterator iter = managementObjects.find (objId); @@ -335,9 +343,233 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, iter->second->doMethod (methodName, inBuffer, outBuffer); } - outLen = 4096 - outBuffer.available (); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementAgent::handleHello (Buffer&, string replyToKey) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + uint8_t* dat = (uint8_t*) "Broker ID"; + EncodeHeader (outBuffer, 'I'); + outBuffer.putShort (9); + outBuffer.putRawData (dat, 9); + + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey) +{ + for (PackageMap::iterator pIter = packages.begin (); + pIter != packages.end (); + pIter++) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'p'); + EncodePackageIndication (outBuffer, pIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } +} + +void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/) +{ + std::string packageName; + + inBuffer.getShortString (packageName); + FindOrAddPackage (packageName); +} + +void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey) +{ + std::string packageName; + + inBuffer.getShortString (packageName); + PackageMap::iterator pIter = packages.find (packageName); + if (pIter != packages.end ()) + { + ClassMap cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin (); + cIter != cMap.end (); + cIter++) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'q'); + EncodeClassIndication (outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + } +} + +void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey) +{ + string packageName; + SchemaClassKey key; + + inBuffer.getShortString (packageName); + inBuffer.getShortString (key.name); + inBuffer.getBin128 (key.hash); + + PackageMap::iterator pIter = packages.find (packageName); + if (pIter != packages.end ()) + { + ClassMap cMap = pIter->second; + ClassMap::iterator cIter = cMap.find (key); + if (cIter != cMap.end ()) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + SchemaClass classInfo = cIter->second; + + if (classInfo.writeSchemaCall != 0) + { + EncodeHeader (outBuffer, 's'); + classInfo.writeSchemaCall (outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + else + { + // TODO: Forward request to remote agent. + } + + clientAdded (); + // TODO: Send client-added to each remote agent. + } + } +} + +uint32_t ManagementAgent::assignPrefix (uint32_t /*requestedPrefix*/) +{ + // TODO: Allow remote agents to keep their requested prefixes if able. + return nextRemotePrefix++; +} + +void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey) +{ + string label; + uint32_t requestedPrefix; + uint32_t assignedPrefix; + + inBuffer.getShortString (label); + requestedPrefix = inBuffer.getLong (); + assignedPrefix = assignPrefix (requestedPrefix); + + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'a'); + outBuffer.putLong (assignedPrefix); + outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); - free (inMem); +} + +void ManagementAgent::dispatchAgentCommand (Message& msg) +{ + Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode, unused; + string replyToKey; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) + { + const framing::ReplyTo& rt = p->getReplyTo (); + replyToKey = rt.getRoutingKey (); + } + else + return; + + msg.encodeContent (inBuffer); + inBuffer.reset (); + + if (!CheckHeader (inBuffer, &opcode, &unused)) + return; + + if (opcode == 'H') handleHello (inBuffer, replyToKey); + else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey); + else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey); + else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey); + else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey); + else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey); +} + +ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::string name) +{ + PackageMap::iterator pIter = packages.find (name); + if (pIter != packages.end ()) + return pIter; + + // No such package found, create a new map entry. + pair<PackageMap::iterator, bool> result = + packages.insert (pair<string, ClassMap> (name, ClassMap ())); + QPID_LOG (debug, "ManagementAgent added package " << name); + + // Publish a package-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'p'); + EncodePackageIndication (outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, mExchange, "mgmt.schema.package"); + + return result.first; +} + +void ManagementAgent::AddClassLocal (PackageMap::iterator pIter, + ManagementObject::shared_ptr object) +{ + SchemaClassKey key; + ClassMap& cMap = pIter->second; + + key.name = object->getClassName (); + memcpy (&key.hash, object->getMd5Sum (), 16); + + ClassMap::iterator cIter = cMap.find (key); + if (cIter != cMap.end ()) + return; + + // No such class found, create a new class with local information. + QPID_LOG (debug, "ManagementAgent added class " << pIter->first << "." << + key.name); + SchemaClass classInfo; + + classInfo.writeSchemaCall = object->getWriteSchemaCall (); + cMap[key] = classInfo; + + // TODO: Publish a class-indication message +} + +void ManagementAgent::EncodePackageIndication (Buffer& buf, + PackageMap::iterator pIter) +{ + buf.putShortString ((*pIter).first); +} + +void ManagementAgent::EncodeClassIndication (Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) +{ + SchemaClassKey key = (*cIter).first; + + buf.putShortString ((*pIter).first); + buf.putShortString (key.name); + buf.putBin128 (key.hash); } diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 30b8857c27..2acbe124bd 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -27,6 +27,7 @@ #include "qpid/broker/Timer.h" #include "qpid/sys/Mutex.h" #include "ManagementObject.h" +#include <qpid/framing/AMQFrame.h> #include <boost/shared_ptr.hpp> namespace qpid { @@ -70,16 +71,76 @@ class ManagementAgent void fire (); }; + // Storage for tracking remote management agents, attached via the client + // management agent API. + // + struct RemoteAgent + { + std::string name; + uint64_t objIdBase; + }; + + // TODO: Eventually replace string with entire reply-to structure. reply-to + // currently assumes that the exchange is "amq.direct" even though it could + // in theory be specified differently. + typedef std::map<std::string, RemoteAgent> RemoteAgentMap; + typedef std::vector<std::string> ReplyToVector; + + // Storage for known schema classes: + // + // SchemaClassKey -- Key elements for map lookups + // SchemaClassKeyComp -- Comparison class for SchemaClassKey + // SchemaClass -- Non-key elements for classes + // + struct SchemaClassKey + { + std::string name; + uint8_t hash[16]; + }; + + struct SchemaClassKeyComp + { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + struct SchemaClass + { + ManagementObject::writeSchemaCall_t writeSchemaCall; + ReplyToVector remoteAgents; + + SchemaClass () : writeSchemaCall(0) {} + }; + + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; + typedef std::map<std::string, ClassMap> PackageMap; + + RemoteAgentMap remoteAgents; + PackageMap packages; + ManagementObjectMap managementObjects; + static shared_ptr agent; static bool enabled; qpid::sys::RWlock userLock; - ManagementObjectMap managementObjects; broker::Timer timer; broker::Exchange::shared_ptr mExchange; broker::Exchange::shared_ptr dExchange; uint16_t interval; uint64_t nextObjectId; + uint32_t nextRemotePrefix; + +# define MA_BUFFER_SIZE 65536 + char inputBuffer[MA_BUFFER_SIZE]; + char outputBuffer[MA_BUFFER_SIZE]; void PeriodicProcessing (void); void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint8_t cls = 0); @@ -88,6 +149,27 @@ class ManagementAgent uint32_t length, broker::Exchange::shared_ptr exchange, std::string routingKey); + + void dispatchMethod (broker::Message& msg, + const std::string& routingKey, + size_t first); + void dispatchAgentCommand (broker::Message& msg); + + PackageMap::iterator FindOrAddPackage (std::string name); + void AddClassLocal (PackageMap::iterator pIter, + ManagementObject::shared_ptr object); + void EncodePackageIndication (qpid::framing::Buffer& buf, + PackageMap::iterator pIter); + void EncodeClassIndication (qpid::framing::Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter); + uint32_t assignPrefix (uint32_t requestedPrefix); + void handleHello (qpid::framing::Buffer& inBuffer, std::string replyToKey); + void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey); + void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey); + void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey); + void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey); + void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey); }; }} diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp index ee18f026e7..c589aefba0 100644 --- a/cpp/src/qpid/management/ManagementExchange.cpp +++ b/cpp/src/qpid/management/ManagementExchange.cpp @@ -36,28 +36,15 @@ ManagementExchange::ManagementExchange (const std::string& _name, Exchange (_name, _durable, _args, _parent), TopicExchange(_name, _durable, _args, _parent) {} - -bool ManagementExchange::bind (Queue::shared_ptr queue, - const string& routingKey, - const FieldTable* args) -{ - bool result = TopicExchange::bind (queue, routingKey, args); - - // Notify the management agent that a new management client has bound to the - // exchange. - if (result) - managementAgent->clientAdded (); - - return result; -} - void ManagementExchange::route (Deliverable& msg, const string& routingKey, const FieldTable* args) { - // Intercept management commands - if (routingKey.length () > 7 && - routingKey.substr (0, 7).compare ("method.") == 0) + // Intercept management agent commands + if ((routingKey.length () > 6 && + routingKey.substr (0, 6).compare ("agent.") == 0) || + (routingKey.length () == 5 && + routingKey.substr (0, 5).compare ("agent") == 0)) { managementAgent->dispatchCommand (msg, routingKey, args); return; diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h index 1a79482c9d..7faec32b0f 100644 --- a/cpp/src/qpid/management/ManagementExchange.h +++ b/cpp/src/qpid/management/ManagementExchange.h @@ -42,10 +42,6 @@ class ManagementExchange : public virtual TopicExchange virtual std::string getType() const { return typeName; } - virtual bool bind (Queue::shared_ptr queue, - const string& routingKey, - const qpid::framing::FieldTable* args); - virtual void route (Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index c2d1f56be0..6af5412b99 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -29,7 +29,9 @@ using namespace qpid::sys; void ManagementObject::writeTimestamps (Buffer& buf) { - buf.putShortString (className); + buf.putShortString (getPackageName ()); + buf.putShortString (getClassName ()); + buf.putBin128 (getMd5Sum ()); buf.putLongLong (uint64_t (Duration (now ()))); buf.putLongLong (createTime); buf.putLongLong (destroyTime); diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index a32055721d..87c3ccf22a 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -44,8 +44,7 @@ class ManagementObject bool instChanged; bool deleted; Manageable* coreObject; - std::string className; - + static const uint8_t TYPE_U8 = 1; static const uint8_t TYPE_U16 = 2; static const uint8_t TYPE_U32 = 3; @@ -56,6 +55,8 @@ class ManagementObject static const uint8_t TYPE_DELTATIME = 9; static const uint8_t TYPE_REF = 10; static const uint8_t TYPE_BOOL = 11; + static const uint8_t TYPE_FLOAT = 12; + static const uint8_t TYPE_DOUBLE = 13; static const uint8_t ACCESS_RC = 1; static const uint8_t ACCESS_RW = 2; @@ -73,23 +74,26 @@ class ManagementObject public: typedef boost::shared_ptr<ManagementObject> shared_ptr; + typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); - ManagementObject (Manageable* _core, std::string _name) : + ManagementObject (Manageable* _core) : destroyTime(0), objectId (0), configChanged(true), - instChanged(true), deleted(false), coreObject(_core), className(_name) + instChanged(true), deleted(false), coreObject(_core) { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } virtual ~ManagementObject () {} - virtual void writeSchema (qpid::framing::Buffer& buf) = 0; + virtual writeSchemaCall_t getWriteSchemaCall (void) = 0; + virtual bool firstInstance (void) = 0; virtual void writeConfig (qpid::framing::Buffer& buf) = 0; virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0; - virtual bool getSchemaNeeded (void) = 0; - virtual void setSchemaNeeded (void) = 0; virtual void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf) = 0; - std::string getClassName (void) { return className; } + virtual std::string getClassName (void) = 0; + virtual std::string getPackageName (void) = 0; + virtual uint8_t* getMd5Sum (void) = 0; + void setObjectId (uint64_t oid) { objectId = oid; } uint64_t getObjectId (void) { return objectId; } inline bool getConfigChanged (void) { return configChanged; } @@ -108,7 +112,7 @@ class ManagementObject }; - typedef std::map<uint64_t,ManagementObject::shared_ptr> ManagementObjectMap; +typedef std::map<uint64_t,ManagementObject::shared_ptr> ManagementObjectMap; }} |