diff options
24 files changed, 1130 insertions, 408 deletions
diff --git a/qpid/cpp/managementgen/main.py b/qpid/cpp/managementgen/main.py index de8ce4cbe6..677c7321ae 100755 --- a/qpid/cpp/managementgen/main.py +++ b/qpid/cpp/managementgen/main.py @@ -28,6 +28,9 @@ usage = "usage: %prog [options] schema-document type-document template-director parser = OptionParser (usage=usage) parser.add_option ("-m", "--makefile", dest="makefile", metavar="FILE", help="Makefile fragment") +parser.add_option ("-i", "--include-prefix", dest="include_prefix", metavar="PATH", + default="qpid/management/", + help="Prefix for #include of generated headers in generated source, default: qpid/management/") (opts, args) = parser.parse_args () @@ -39,8 +42,11 @@ typefile = args[1] templatedir = args[2] outdir = args[3] +if opts.include_prefix == ".": + opts.include_prefix = None + gen = Generator (outdir, templatedir) -schema = PackageSchema (typefile, schemafile) +schema = PackageSchema (typefile, schemafile, opts) gen.makeClassFiles ("Class.h", schema) gen.makeClassFiles ("Class.cpp", schema) diff --git a/qpid/cpp/managementgen/schema.py b/qpid/cpp/managementgen/schema.py index a459db7a47..fd76ba9112 100755 --- a/qpid/cpp/managementgen/schema.py +++ b/qpid/cpp/managementgen/schema.py @@ -21,6 +21,7 @@ from xml.dom.minidom import parse, parseString, Node from cStringIO import StringIO +import md5 #===================================================================================== # @@ -575,15 +576,18 @@ class SchemaEvent: def getArgCount (self): return len (self.args) -#===================================================================================== -# -#===================================================================================== + class SchemaClass: - def __init__ (self, node, typespec): + def __init__ (self, package, node, typespec, fragments, options): + self.packageName = package self.configElements = [] self.instElements = [] self.methods = [] self.events = [] + self.options = options + self.md5Sum = md5.new () + + self.hash (node) attrs = node.attributes self.name = attrs['name'].nodeValue @@ -607,9 +611,40 @@ class SchemaClass: sub = SchemaEvent (self, child, typespec) self.events.append (sub) + elif child.nodeName == 'group': + self.expandFragment (child, fragments) + else: raise ValueError ("Unknown class tag '%s'" % child.nodeName) + def hash (self, node): + attrs = node.attributes + self.md5Sum.update (node.nodeName) + + for idx in range (attrs.length): + self.md5Sum.update (attrs.item(idx).nodeName) + self.md5Sum.update (attrs.item(idx).nodeValue) + + for child in node.childNodes: + if child.nodeType == Node.ELEMENT_NODE: + self.hash (child) + + def expandFragment (self, node, fragments): + attrs = node.attributes + name = attrs['name'].nodeValue + for fragment in fragments: + if fragment.name == name: + for config in fragment.configElements: + self.configElements.append (config) + for inst in fragment.instElements: + self.instElements.append (inst) + for method in fragment.methods: + self.methods.append (method) + for event in fragment.events: + self.events.append (event) + return + raise ValueError ("Undefined group '%s'" % name) + def getName (self): return self.name @@ -644,13 +679,9 @@ class SchemaClass: def genConstructorArgs (self, stream, variables): # Constructor args are config elements with read-create access result = "" - first = 1 for element in self.configElements: if element.isConstructorArg (): - if first == 1: - first = 0 - else: - stream.write (", ") + stream.write (", ") element.genFormalParam (stream) def genConstructorInits (self, stream, variables): @@ -715,8 +746,8 @@ class SchemaClass: def genMethodArgIncludes (self, stream, variables): for method in self.methods: if method.getArgCount () > 0: - stream.write ("#include \"qpid/management/Args" +\ - method.getFullName () + ".h\"\n") + stream.write ("#include \"" + (self.options.include_prefix or "") +\ + "Args" + method.getFullName () + ".h\"\n") def genMethodCount (self, stream, variables): stream.write ("%d" % len (self.methods)) @@ -765,13 +796,16 @@ class SchemaClass: def genNameLower (self, stream, variables): stream.write (self.name.lower ()) + def genNamePackageLower (self, stream, variables): + stream.write (self.packageName.lower ()) + def genNameUpper (self, stream, variables): stream.write (self.name.upper ()) def genParentArg (self, stream, variables): for config in self.configElements: if config.isParentRef == 1: - stream.write (" _parent") + stream.write (", Manageable* _parent") return def genParentRefAssignment (self, stream, variables): @@ -781,6 +815,13 @@ class SchemaClass: " = _parent->GetManagementObject ()->getObjectId ();") return + def genSchemaMD5 (self, stream, variables): + sum = self.md5Sum.digest () + for idx in range (len (sum)): + if idx != 0: + stream.write (",") + stream.write (hex (ord (sum[idx]))) + def genWriteConfig (self, stream, variables): for config in self.configElements: config.genWrite (stream); @@ -790,14 +831,13 @@ class SchemaClass: inst.genWrite (stream); -#===================================================================================== -# -#===================================================================================== + class PackageSchema: - def __init__ (self, typefile, schemafile): + def __init__ (self, typefile, schemafile, options): - self.classes = [] - self.typespec = TypeSpec (typefile) + self.classes = [] + self.fragments = [] + self.typespec = TypeSpec (typefile) dom = parse (schemafile) document = dom.documentElement @@ -810,8 +850,15 @@ class PackageSchema: for child in children: if child.nodeType == Node.ELEMENT_NODE: if child.nodeName == 'class': - cls = SchemaClass (child, self.typespec) + cls = SchemaClass (self.packageName, child, self.typespec, + self.fragments, options) self.classes.append (cls) + + elif child.nodeName == 'group': + cls = SchemaClass (self.packageName, child, self.typespec, + self.fragments, options) + self.fragments.append (cls) + else: raise ValueError ("Unknown schema tag '%s'" % child.nodeName) diff --git a/qpid/cpp/managementgen/templates/Class.cpp b/qpid/cpp/managementgen/templates/Class.cpp index d87d11f767..2a3f71e262 100644 --- a/qpid/cpp/managementgen/templates/Class.cpp +++ b/qpid/cpp/managementgen/templates/Class.cpp @@ -31,11 +31,14 @@ using namespace qpid::sys; using namespace qpid::framing; using std::string; -bool /*MGEN:Class.NameCap*/::schemaNeeded = true; - -/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (Manageable* _core, Manageable*/*MGEN:Class.ParentArg*/, - /*MGEN:Class.ConstructorArgs*/) : - ManagementObject(_core, "/*MGEN:Class.NameLower*/") +string /*MGEN:Class.NameCap*/::packageName = string ("/*MGEN:Class.NamePackageLower*/"); +string /*MGEN:Class.NameCap*/::className = string ("/*MGEN:Class.NameLower*/"); +uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] = + {/*MGEN:Class.SchemaMD5*/}; +bool /*MGEN:Class.NameCap*/::firstInst = true; + +/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) : + ManagementObject(_core) /*MGEN:Class.ConstructorInits*/ { /*MGEN:Class.ParentRefAssignment*/ @@ -60,14 +63,26 @@ namespace { const string DEFAULT("default"); } +bool /*MGEN:Class.NameCap*/::firstInstance (void) +{ + Mutex::ScopedLock alock(accessorLock); + if (firstInst) + { + firstInst = false; + return true; + } + + return false; +} + void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) { FieldTable ft; - schemaNeeded = false; - // Schema class header: - buf.putShortString (className); // Class Name + buf.putShortString (packageName); // Package Name + buf.putShortString (className); // Class Name + buf.putBin128 (md5Sum); // Schema Hash buf.putShort (/*MGEN:Class.ConfigCount*/); // Config Element Count buf.putShort (/*MGEN:Class.InstCount*/); // Inst Element Count buf.putShort (/*MGEN:Class.MethodCount*/); // Method Count diff --git a/qpid/cpp/managementgen/templates/Class.h b/qpid/cpp/managementgen/templates/Class.h index ba6a1183e2..82fac00d47 100644 --- a/qpid/cpp/managementgen/templates/Class.h +++ b/qpid/cpp/managementgen/templates/Class.h @@ -33,22 +33,24 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject { private: - static bool schemaNeeded; + static std::string packageName; + static std::string className; + static uint8_t md5Sum[16]; + static bool firstInst; // Configuration Elements /*MGEN:Class.ConfigDeclarations*/ // Instrumentation Elements /*MGEN:Class.InstDeclarations*/ // Private Methods - std::string getObjectName (void) { return "/*MGEN:Class.NameLower*/"; } - void writeSchema (qpid::framing::Buffer& buf); + static void writeSchema (qpid::framing::Buffer& buf); void writeConfig (qpid::framing::Buffer& buf); void writeInstrumentation (qpid::framing::Buffer& buf); - bool getSchemaNeeded (void) { return schemaNeeded; } - void setSchemaNeeded (void) { schemaNeeded = true; } void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf); + writeSchemaCall_t getWriteSchemaCall (void) { return writeSchema; } + bool firstInstance (void); /*MGEN:Class.InstChangedStub*/ public: @@ -56,10 +58,13 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr; qpid::sys::Mutex accessorLock; - /*MGEN:Class.NameCap*/ (Manageable* coreObject, Manageable* parentObject, - /*MGEN:Class.ConstructorArgs*/); + /*MGEN:Class.NameCap*/ (Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/); ~/*MGEN:Class.NameCap*/ (void); + std::string getPackageName (void) { return packageName; } + std::string getClassName (void) { return className; } + uint8_t* getMd5Sum (void) { return md5Sum; } + // Method IDs /*MGEN:Class.MethodIdDeclarations*/ // Accessor Methods diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index e3b95e045f..becccb4224 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -213,6 +213,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SessionContext.h \ qpid/broker/SessionHandler.cpp \ qpid/broker/SemanticHandler.cpp \ + qpid/broker/System.cpp \ qpid/broker/Timer.cpp \ qpid/broker/TopicExchange.cpp \ qpid/broker/TxAck.cpp \ @@ -331,6 +332,7 @@ nobase_include_HEADERS = \ qpid/broker/RecoveryManagerImpl.h \ qpid/broker/SemanticHandler.h \ qpid/broker/SessionManager.h \ + qpid/broker/System.h \ qpid/broker/Timer.h \ qpid/broker/TopicExchange.h \ qpid/broker/TransactionalStore.h \ diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 9bfa868d9c..8b70831cf7 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -131,13 +131,18 @@ Broker::Broker(const Broker::Options& conf) : managementAgent = ManagementAgent::getAgent (); managementAgent->setInterval (conf.mgmtPubInterval); - mgmtObject = management::Broker::shared_ptr (new management::Broker (this, 0, 0, conf.port)); + System* system = new System (); + systemObject = System::shared_ptr (system); + + mgmtObject = management::Broker::shared_ptr (new management::Broker (this, system, conf.port)); mgmtObject->set_workerThreads (conf.workerThreads); mgmtObject->set_maxConns (conf.maxConnections); mgmtObject->set_connBacklog (conf.connectionBacklog); mgmtObject->set_stagingThreshold (conf.stagingThreshold); mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval); mgmtObject->set_version (PACKAGE_VERSION); + mgmtObject->set_dataDirEnabled (dataDir.isEnabled ()); + mgmtObject->set_dataDir (dataDir.getPath ()); managementAgent->addObject (mgmtObject, 1, 0); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 153eabc6b3..9e5191825d 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -32,6 +32,7 @@ #include "SessionManager.h" #include "PreviewSessionManager.h" #include "Vhost.h" +#include "System.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" #include "qpid/management/Broker.h" @@ -142,6 +143,7 @@ class Broker : public sys::Runnable, public Plugin::Target, management::ManagementAgent::shared_ptr managementAgent; management::Broker::shared_ptr mgmtObject; Vhost::shared_ptr vhostObject; + System::shared_ptr systemObject; void declareStandardExchange(const std::string& name, const std::string& type); }; diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp new file mode 100644 index 0000000000..87d5185b97 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/System.cpp @@ -0,0 +1,48 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "System.h" +#include "qpid/management/ManagementAgent.h" +#include <sys/utsname.h> + +using namespace qpid::broker; +using qpid::management::ManagementAgent; + +System::System () +{ + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + + if (agent.get () != 0) + { + mgmtObject = management::System::shared_ptr + (new management::System (this, "host")); + struct utsname _uname; + if (uname (&_uname) == 0) + { + mgmtObject->set_osName (std::string (_uname.sysname)); + mgmtObject->set_nodeName (std::string (_uname.nodename)); + mgmtObject->set_release (std::string (_uname.release)); + mgmtObject->set_version (std::string (_uname.version)); + mgmtObject->set_machine (std::string (_uname.machine)); + } + + agent->addObject (mgmtObject, 3, 0); + } +} + diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h new file mode 100644 index 0000000000..a1a710f2b2 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/System.h @@ -0,0 +1,51 @@ +#ifndef _BrokerSystem_ +#define _BrokerSystem_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/management/Manageable.h" +#include "qpid/management/System.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +class System : public management::Manageable +{ + private: + + management::System::shared_ptr mgmtObject; + + public: + + typedef boost::shared_ptr<System> shared_ptr; + + System (); + + management::ManagementObject::shared_ptr GetManagementObject (void) const + { return mgmtObject; } + + management::Manageable::status_t ManagementMethod (uint32_t, management::Args&) + { return management::Manageable::STATUS_OK; } +}; + +}} + +#endif /*!_BrokerSystem_*/ diff --git a/qpid/cpp/src/qpid/framing/Buffer.cpp b/qpid/cpp/src/qpid/framing/Buffer.cpp index 7eadf377b9..c0cd210042 100644 --- a/qpid/cpp/src/qpid/framing/Buffer.cpp +++ b/qpid/cpp/src/qpid/framing/Buffer.cpp @@ -74,6 +74,31 @@ void Buffer::putLongLong(uint64_t i){ putLong(lo); } +void Buffer::putFloat(float f){ + union { + uint32_t i; + float f; + } val; + + val.f = f; + putLong (val.i); +} + +void Buffer::putDouble(double f){ + union { + uint64_t i; + double f; + } val; + + val.f = f; + putLongLong (val.i); +} + +void Buffer::putBin128(uint8_t* b){ + memcpy (data + position, b, 16); + position += 16; +} + uint8_t Buffer::getOctet(){ return (uint8_t) data[position++]; } @@ -104,6 +129,24 @@ uint64_t Buffer::getLongLong(){ return hi | lo; } +float Buffer::getFloat(){ + union { + uint32_t i; + float f; + } val; + val.i = getLong(); + return val.f; +} + +double Buffer::getDouble(){ + union { + uint64_t i; + double f; + } val; + val.i = getLongLong(); + return val.f; +} + template <> uint64_t Buffer::getUInt<1>() { return getOctet(); @@ -172,6 +215,11 @@ void Buffer::getLongString(string& s){ position += len; } +void Buffer::getBin128(uint8_t* b){ + memcpy (b, data + position, 16); + position += 16; +} + void Buffer::putRawData(const string& s){ uint32_t len = s.length(); s.copy(data + position, len); diff --git a/qpid/cpp/src/qpid/framing/Buffer.h b/qpid/cpp/src/qpid/framing/Buffer.h index 5ab897d351..d0ca41f82b 100644 --- a/qpid/cpp/src/qpid/framing/Buffer.h +++ b/qpid/cpp/src/qpid/framing/Buffer.h @@ -57,11 +57,16 @@ public: void putShort(uint16_t i); void putLong(uint32_t i); void putLongLong(uint64_t i); + void putFloat(float f); + void putDouble(double f); + void putBin128(uint8_t* b); - uint8_t getOctet(); + uint8_t getOctet(); uint16_t getShort(); uint32_t getLong(); uint64_t getLongLong(); + float getFloat(); + double getDouble(); template <int n> uint64_t getUInt(); @@ -73,6 +78,7 @@ public: void putLongString(const string& s); void getShortString(string& s); void getLongString(string& s); + void getBin128(uint8_t* b); void putRawData(const string& s); void getRawData(string& s, uint32_t size); diff --git a/qpid/cpp/src/qpid/management/Manageable.h b/qpid/cpp/src/qpid/management/Manageable.h index 155b71da54..1fb890c8c6 100644 --- a/qpid/cpp/src/qpid/management/Manageable.h +++ b/qpid/cpp/src/qpid/management/Manageable.h @@ -23,6 +23,7 @@ #include "ManagementObject.h" #include "Args.h" #include <string> +#include <boost/shared_ptr.hpp> namespace qpid { namespace management { diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 709f2a0ecd..bdbabbaf47 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -24,13 +24,13 @@ #include "qpid/log/Statement.h" #include <qpid/broker/Message.h> #include <qpid/broker/MessageDelivery.h> -#include <qpid/framing/AMQFrame.h> #include <list> using namespace qpid::framing; using namespace qpid::management; using namespace qpid::broker; using namespace qpid::sys; +using namespace std; ManagementAgent::shared_ptr ManagementAgent::agent; bool ManagementAgent::enabled = 0; @@ -39,6 +39,7 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) { timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ())); + nextRemotePrefix = 101; } ManagementAgent::~ManagementAgent () {} @@ -87,6 +88,15 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object, object->setObjectId (objectId); managementObjects[objectId] = object; + + // If we've already seen instances of this object type, we're done. + if (!object->firstInstance ()) + return; + + // This is the first object of this type that we've seen, update the schema + // inventory. + PackageMap::iterator pIter = FindOrAddPackage (object->getPackageName ()); + AddClassLocal (pIter, object); } ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) @@ -102,15 +112,12 @@ void ManagementAgent::Periodic::fire () void ManagementAgent::clientAdded (void) { - RWlock::ScopedRlock readLock (userLock); - for (ManagementObjectMap::iterator iter = managementObjects.begin (); iter != managementObjects.end (); iter++) { ManagementObject::shared_ptr object = iter->second; - object->setAllChanged (); - object->setSchemaNeeded (); + object->setAllChanged (); } } @@ -142,6 +149,9 @@ void ManagementAgent::SendBuffer (Buffer& buf, broker::Exchange::shared_ptr exchange, string routingKey) { + if (exchange.get() == 0) + return; + intrusive_ptr<Message> msg (new Message ()); AMQFrame method (in_place<MessageTransferBody>( ProtocolVersion(), 0, exchange->getName (), 0, 0)); @@ -170,7 +180,6 @@ void ManagementAgent::SendBuffer (Buffer& buf, void ManagementAgent::PeriodicProcessing (void) { #define BUFSIZE 65536 -#define THRESHOLD 16384 RWlock::ScopedWlock writeLock (userLock); char msgChars[BUFSIZE]; uint32_t contentSize; @@ -186,18 +195,6 @@ void ManagementAgent::PeriodicProcessing (void) { ManagementObject::shared_ptr object = iter->second; - if (object->getSchemaNeeded ()) - { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'S'); - object->writeSchema (msgBuffer); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt.schema." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); - } - if (object->getConfigChanged () || object->isDeleted ()) { Buffer msgBuffer (msgChars, BUFSIZE); @@ -239,17 +236,30 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, const string& routingKey, const FieldTable* /*args*/) { - size_t pos, start; + RWlock::ScopedRlock readLock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - uint32_t contentSize; - if (routingKey.compare (0, 7, "method.") != 0) + if (routingKey.compare (0, 13, "agent.method.") == 0) + dispatchMethod (msg, routingKey, 13); + + else if (routingKey.length () == 5 && + routingKey.compare (0, 5, "agent") == 0) + dispatchAgentCommand (msg); + + else { QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); return; } +} + +void ManagementAgent::dispatchMethod (Message& msg, + const string& routingKey, + size_t first) +{ + size_t pos, start = first; + uint32_t contentSize; - start = 7; if (routingKey.length () == start) { QPID_LOG (debug, "Missing package-name in routing key: " << routingKey); @@ -279,13 +289,11 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, string methodName = routingKey.substr (start, routingKey.length () - start); contentSize = msg.encodedContentSize (); - if (contentSize < 8 || contentSize > 65536) + if (contentSize < 8 || contentSize > MA_BUFFER_SIZE) return; - char *inMem = new char[contentSize]; - char outMem[4096]; // TODO Fix This - Buffer inBuffer (inMem, contentSize); - Buffer outBuffer (outMem, 4096); + Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; uint8_t opcode, unused; @@ -321,7 +329,7 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, return; } - EncodeHeader (outBuffer, 'R'); + EncodeHeader (outBuffer, 'm'); outBuffer.putLong (methodId); ManagementObjectMap::iterator iter = managementObjects.find (objId); @@ -335,9 +343,233 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, iter->second->doMethod (methodName, inBuffer, outBuffer); } - outLen = 4096 - outBuffer.available (); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementAgent::handleHello (Buffer&, string replyToKey) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + uint8_t* dat = (uint8_t*) "Broker ID"; + EncodeHeader (outBuffer, 'I'); + outBuffer.putShort (9); + outBuffer.putRawData (dat, 9); + + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey) +{ + for (PackageMap::iterator pIter = packages.begin (); + pIter != packages.end (); + pIter++) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'p'); + EncodePackageIndication (outBuffer, pIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } +} + +void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/) +{ + std::string packageName; + + inBuffer.getShortString (packageName); + FindOrAddPackage (packageName); +} + +void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey) +{ + std::string packageName; + + inBuffer.getShortString (packageName); + PackageMap::iterator pIter = packages.find (packageName); + if (pIter != packages.end ()) + { + ClassMap cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin (); + cIter != cMap.end (); + cIter++) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'q'); + EncodeClassIndication (outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + } +} + +void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey) +{ + string packageName; + SchemaClassKey key; + + inBuffer.getShortString (packageName); + inBuffer.getShortString (key.name); + inBuffer.getBin128 (key.hash); + + PackageMap::iterator pIter = packages.find (packageName); + if (pIter != packages.end ()) + { + ClassMap cMap = pIter->second; + ClassMap::iterator cIter = cMap.find (key); + if (cIter != cMap.end ()) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + SchemaClass classInfo = cIter->second; + + if (classInfo.writeSchemaCall != 0) + { + EncodeHeader (outBuffer, 's'); + classInfo.writeSchemaCall (outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + else + { + // TODO: Forward request to remote agent. + } + + clientAdded (); + // TODO: Send client-added to each remote agent. + } + } +} + +uint32_t ManagementAgent::assignPrefix (uint32_t /*requestedPrefix*/) +{ + // TODO: Allow remote agents to keep their requested prefixes if able. + return nextRemotePrefix++; +} + +void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey) +{ + string label; + uint32_t requestedPrefix; + uint32_t assignedPrefix; + + inBuffer.getShortString (label); + requestedPrefix = inBuffer.getLong (); + assignedPrefix = assignPrefix (requestedPrefix); + + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'a'); + outBuffer.putLong (assignedPrefix); + outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); - free (inMem); +} + +void ManagementAgent::dispatchAgentCommand (Message& msg) +{ + Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode, unused; + string replyToKey; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) + { + const framing::ReplyTo& rt = p->getReplyTo (); + replyToKey = rt.getRoutingKey (); + } + else + return; + + msg.encodeContent (inBuffer); + inBuffer.reset (); + + if (!CheckHeader (inBuffer, &opcode, &unused)) + return; + + if (opcode == 'H') handleHello (inBuffer, replyToKey); + else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey); + else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey); + else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey); + else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey); + else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey); +} + +ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::string name) +{ + PackageMap::iterator pIter = packages.find (name); + if (pIter != packages.end ()) + return pIter; + + // No such package found, create a new map entry. + pair<PackageMap::iterator, bool> result = + packages.insert (pair<string, ClassMap> (name, ClassMap ())); + QPID_LOG (debug, "ManagementAgent added package " << name); + + // Publish a package-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'p'); + EncodePackageIndication (outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, mExchange, "mgmt.schema.package"); + + return result.first; +} + +void ManagementAgent::AddClassLocal (PackageMap::iterator pIter, + ManagementObject::shared_ptr object) +{ + SchemaClassKey key; + ClassMap& cMap = pIter->second; + + key.name = object->getClassName (); + memcpy (&key.hash, object->getMd5Sum (), 16); + + ClassMap::iterator cIter = cMap.find (key); + if (cIter != cMap.end ()) + return; + + // No such class found, create a new class with local information. + QPID_LOG (debug, "ManagementAgent added class " << pIter->first << "." << + key.name); + SchemaClass classInfo; + + classInfo.writeSchemaCall = object->getWriteSchemaCall (); + cMap[key] = classInfo; + + // TODO: Publish a class-indication message +} + +void ManagementAgent::EncodePackageIndication (Buffer& buf, + PackageMap::iterator pIter) +{ + buf.putShortString ((*pIter).first); +} + +void ManagementAgent::EncodeClassIndication (Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) +{ + SchemaClassKey key = (*cIter).first; + + buf.putShortString ((*pIter).first); + buf.putShortString (key.name); + buf.putBin128 (key.hash); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 30b8857c27..2acbe124bd 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -27,6 +27,7 @@ #include "qpid/broker/Timer.h" #include "qpid/sys/Mutex.h" #include "ManagementObject.h" +#include <qpid/framing/AMQFrame.h> #include <boost/shared_ptr.hpp> namespace qpid { @@ -70,16 +71,76 @@ class ManagementAgent void fire (); }; + // Storage for tracking remote management agents, attached via the client + // management agent API. + // + struct RemoteAgent + { + std::string name; + uint64_t objIdBase; + }; + + // TODO: Eventually replace string with entire reply-to structure. reply-to + // currently assumes that the exchange is "amq.direct" even though it could + // in theory be specified differently. + typedef std::map<std::string, RemoteAgent> RemoteAgentMap; + typedef std::vector<std::string> ReplyToVector; + + // Storage for known schema classes: + // + // SchemaClassKey -- Key elements for map lookups + // SchemaClassKeyComp -- Comparison class for SchemaClassKey + // SchemaClass -- Non-key elements for classes + // + struct SchemaClassKey + { + std::string name; + uint8_t hash[16]; + }; + + struct SchemaClassKeyComp + { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + struct SchemaClass + { + ManagementObject::writeSchemaCall_t writeSchemaCall; + ReplyToVector remoteAgents; + + SchemaClass () : writeSchemaCall(0) {} + }; + + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; + typedef std::map<std::string, ClassMap> PackageMap; + + RemoteAgentMap remoteAgents; + PackageMap packages; + ManagementObjectMap managementObjects; + static shared_ptr agent; static bool enabled; qpid::sys::RWlock userLock; - ManagementObjectMap managementObjects; broker::Timer timer; broker::Exchange::shared_ptr mExchange; broker::Exchange::shared_ptr dExchange; uint16_t interval; uint64_t nextObjectId; + uint32_t nextRemotePrefix; + +# define MA_BUFFER_SIZE 65536 + char inputBuffer[MA_BUFFER_SIZE]; + char outputBuffer[MA_BUFFER_SIZE]; void PeriodicProcessing (void); void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint8_t cls = 0); @@ -88,6 +149,27 @@ class ManagementAgent uint32_t length, broker::Exchange::shared_ptr exchange, std::string routingKey); + + void dispatchMethod (broker::Message& msg, + const std::string& routingKey, + size_t first); + void dispatchAgentCommand (broker::Message& msg); + + PackageMap::iterator FindOrAddPackage (std::string name); + void AddClassLocal (PackageMap::iterator pIter, + ManagementObject::shared_ptr object); + void EncodePackageIndication (qpid::framing::Buffer& buf, + PackageMap::iterator pIter); + void EncodeClassIndication (qpid::framing::Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter); + uint32_t assignPrefix (uint32_t requestedPrefix); + void handleHello (qpid::framing::Buffer& inBuffer, std::string replyToKey); + void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey); + void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey); + void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey); + void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey); + void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey); }; }} diff --git a/qpid/cpp/src/qpid/management/ManagementExchange.cpp b/qpid/cpp/src/qpid/management/ManagementExchange.cpp index ee18f026e7..c589aefba0 100644 --- a/qpid/cpp/src/qpid/management/ManagementExchange.cpp +++ b/qpid/cpp/src/qpid/management/ManagementExchange.cpp @@ -36,28 +36,15 @@ ManagementExchange::ManagementExchange (const std::string& _name, Exchange (_name, _durable, _args, _parent), TopicExchange(_name, _durable, _args, _parent) {} - -bool ManagementExchange::bind (Queue::shared_ptr queue, - const string& routingKey, - const FieldTable* args) -{ - bool result = TopicExchange::bind (queue, routingKey, args); - - // Notify the management agent that a new management client has bound to the - // exchange. - if (result) - managementAgent->clientAdded (); - - return result; -} - void ManagementExchange::route (Deliverable& msg, const string& routingKey, const FieldTable* args) { - // Intercept management commands - if (routingKey.length () > 7 && - routingKey.substr (0, 7).compare ("method.") == 0) + // Intercept management agent commands + if ((routingKey.length () > 6 && + routingKey.substr (0, 6).compare ("agent.") == 0) || + (routingKey.length () == 5 && + routingKey.substr (0, 5).compare ("agent") == 0)) { managementAgent->dispatchCommand (msg, routingKey, args); return; diff --git a/qpid/cpp/src/qpid/management/ManagementExchange.h b/qpid/cpp/src/qpid/management/ManagementExchange.h index 1a79482c9d..7faec32b0f 100644 --- a/qpid/cpp/src/qpid/management/ManagementExchange.h +++ b/qpid/cpp/src/qpid/management/ManagementExchange.h @@ -42,10 +42,6 @@ class ManagementExchange : public virtual TopicExchange virtual std::string getType() const { return typeName; } - virtual bool bind (Queue::shared_ptr queue, - const string& routingKey, - const qpid::framing::FieldTable* args); - virtual void route (Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index c2d1f56be0..6af5412b99 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -29,7 +29,9 @@ using namespace qpid::sys; void ManagementObject::writeTimestamps (Buffer& buf) { - buf.putShortString (className); + buf.putShortString (getPackageName ()); + buf.putShortString (getClassName ()); + buf.putBin128 (getMd5Sum ()); buf.putLongLong (uint64_t (Duration (now ()))); buf.putLongLong (createTime); buf.putLongLong (destroyTime); diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index a32055721d..87c3ccf22a 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -44,8 +44,7 @@ class ManagementObject bool instChanged; bool deleted; Manageable* coreObject; - std::string className; - + static const uint8_t TYPE_U8 = 1; static const uint8_t TYPE_U16 = 2; static const uint8_t TYPE_U32 = 3; @@ -56,6 +55,8 @@ class ManagementObject static const uint8_t TYPE_DELTATIME = 9; static const uint8_t TYPE_REF = 10; static const uint8_t TYPE_BOOL = 11; + static const uint8_t TYPE_FLOAT = 12; + static const uint8_t TYPE_DOUBLE = 13; static const uint8_t ACCESS_RC = 1; static const uint8_t ACCESS_RW = 2; @@ -73,23 +74,26 @@ class ManagementObject public: typedef boost::shared_ptr<ManagementObject> shared_ptr; + typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); - ManagementObject (Manageable* _core, std::string _name) : + ManagementObject (Manageable* _core) : destroyTime(0), objectId (0), configChanged(true), - instChanged(true), deleted(false), coreObject(_core), className(_name) + instChanged(true), deleted(false), coreObject(_core) { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } virtual ~ManagementObject () {} - virtual void writeSchema (qpid::framing::Buffer& buf) = 0; + virtual writeSchemaCall_t getWriteSchemaCall (void) = 0; + virtual bool firstInstance (void) = 0; virtual void writeConfig (qpid::framing::Buffer& buf) = 0; virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0; - virtual bool getSchemaNeeded (void) = 0; - virtual void setSchemaNeeded (void) = 0; virtual void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf) = 0; - std::string getClassName (void) { return className; } + virtual std::string getClassName (void) = 0; + virtual std::string getPackageName (void) = 0; + virtual uint8_t* getMd5Sum (void) = 0; + void setObjectId (uint64_t oid) { objectId = oid; } uint64_t getObjectId (void) { return objectId; } inline bool getConfigChanged (void) { return configChanged; } @@ -108,7 +112,7 @@ class ManagementObject }; - typedef std::map<uint64_t,ManagementObject::shared_ptr> ManagementObjectMap; +typedef std::map<uint64_t,ManagementObject::shared_ptr> ManagementObjectMap; }} diff --git a/qpid/python/mgmt-cli/main.py b/qpid/python/mgmt-cli/main.py index 76e1f25c14..f4c22012eb 100755 --- a/qpid/python/mgmt-cli/main.py +++ b/qpid/python/mgmt-cli/main.py @@ -104,7 +104,10 @@ class Mcli (Cmd): self.dataObject.do_list (data) def do_call (self, data): - self.dataObject.do_call (data) + try: + self.dataObject.do_call (data) + except ValueError, e: + print "ValueError:", e def do_EOF (self, data): print "quit" @@ -121,7 +124,10 @@ class Mcli (Cmd): self.dataObject.close () def Usage (): - print sys.argv[0], "[<target-host> [<tcp-port>]]" + print "Usage:", sys.argv[0], "[OPTIONS] [<target-host> [<tcp-port>]]" + print + print "Options:" + print " -s <amqp-spec-file> default: /usr/share/amqp/amqp.0-10-preview.xml" print sys.exit (1) @@ -134,13 +140,15 @@ try: (optlist, cargs) = getopt.getopt (sys.argv[1:], 's:') except: Usage () + exit (1) specpath = "/usr/share/amqp/amqp.0-10-preview.xml" host = "localhost" port = 5672 -if "s" in optlist: - specpath = optlist["s"] +for opt in optlist: + if opt[0] == "-s": + specpath = opt[1] if len (cargs) > 0: host = cargs[0] @@ -148,19 +156,27 @@ if len (cargs) > 0: if len (cargs) > 1: port = int (cargs[1]) -print ("Management Tool for QPID") disp = Display () # Attempt to make a connection to the target broker try: - data = ManagementData (disp, host, port, spec=specpath) + data = ManagementData (disp, host, port, specfile=specpath) except socket.error, e: - sys.exit (0) + print "Socket Error:", e[1] + sys.exit (1) except Closed, e: if str(e).find ("Exchange not found") != -1: print "Management not enabled on broker: Use '-m yes' option on broker startup." - sys.exit (0) + sys.exit (1) +except IOError, e: + print "IOError: %d - %s: %s" % (e.errno, e.strerror, e.filename) + sys.exit (1) # Instantiate the CLI interpreter and launch it. cli = Mcli (data, disp) -cli.cmdloop () +print ("Management Tool for QPID") +try: + cli.cmdloop () +except Closed, e: + print "Connection to Broker Lost:", e + exit (1) diff --git a/qpid/python/mgmt-cli/managementdata.py b/qpid/python/mgmt-cli/managementdata.py index e7233c98ae..5b13594994 100644 --- a/qpid/python/mgmt-cli/managementdata.py +++ b/qpid/python/mgmt-cli/managementdata.py @@ -19,10 +19,12 @@ # under the License. # -from qpid.management import ManagedBroker +import qpid +from qpid.management import managementChannel, managementClient from threading import Lock from disp import Display from shlex import split +from qpid.client import Client class ManagementData: @@ -35,9 +37,10 @@ class ManagementData: # The only historical data it keeps are the high and low watermarks # for hi-lo statistics. # - # tables :== {<class-name>} + # tables :== {class-key} # {<obj-id>} # (timestamp, config-record, inst-record) + # class-key :== (<package-name>, <class-name>, <class-hash>) # timestamp :== (<last-interval-time>, <create-time>, <delete-time>) # config-record :== [element] # inst-record :== [element] @@ -59,6 +62,10 @@ class ManagementData: return displayId + self.baseId return displayId - 5000 + 0x8000000000000000L + def displayClassName (self, cls): + (packageName, className, hash) = cls + return packageName + "." + className + def dataHandler (self, context, className, list, timestamps): """ Callback for configuration and instrumentation data updates """ self.lock.acquire () @@ -104,6 +111,12 @@ class ManagementData: finally: self.lock.release () + def configHandler (self, context, className, list, timestamps): + self.dataHandler (0, className, list, timestamps); + + def instHandler (self, context, className, list, timestamps): + self.dataHandler (1, className, list, timestamps); + def methodReply (self, broker, sequence, status, sText, args): """ Callback for method-reply messages """ self.lock.acquire () @@ -121,12 +134,8 @@ class ManagementData: self.schema[className] = (configs, insts, methods, events) def __init__ (self, disp, host, port=5672, username="guest", password="guest", - spec="../../specs/amqp.0-10-preview.xml"): - self.broker = ManagedBroker (host, port, username, password, spec) - self.broker.configListener (0, self.dataHandler) - self.broker.instrumentationListener (1, self.dataHandler) - self.broker.methodListener (None, self.methodReply) - self.broker.schemaListener (None, self.schemaHandler) + specfile="../../specs/amqp.0-10-preview.xml"): + self.spec = qpid.spec.load (specfile) self.lock = Lock () self.tables = {} self.schema = {} @@ -135,24 +144,33 @@ class ManagementData: self.lastUnit = None self.methodSeq = 1 self.methodsPending = {} - self.broker.start () + + self.client = Client (host, port, self.spec) + self.client.start ({"LOGIN": username, "PASSWORD": password}) + self.channel = self.client.channel (1) + + self.mclient = managementClient (self.spec, None, self.configHandler, + self.instHandler, self.methodReply) + self.mclient.schemaListener (self.schemaHandler) + self.mch = managementChannel (self.channel, self.mclient.topicCb, self.mclient.replyCb) + self.mclient.addChannel (self.mch) def close (self): - self.broker.stop () + self.mclient.removeChannel (self.mch) def refName (self, oid): if oid == 0: return "NULL" return str (self.displayObjId (oid)) - def valueDisplay (self, className, key, value): + def valueDisplay (self, classKey, key, value): for kind in range (2): - schema = self.schema[className][kind] + schema = self.schema[classKey][kind] for item in schema: if item[0] == key: typecode = item[1] unit = item[2] - if typecode >= 1 and typecode <= 5: # numerics + if (typecode >= 1 and typecode <= 5) or typecode >= 12: # numerics if unit == None or unit == self.lastUnit: return str (value) else: @@ -191,6 +209,20 @@ class ManagementData: result = result + self.valueDisplay (className, key, val) return result + def getClassKey (self, className): + dotPos = className.find(".") + if dotPos == -1: + for key in self.schema: + if key[1] == className: + return key + else: + package = className[0:dotPos] + name = className[dotPos + 1:] + for key in self.schema: + if key[0] == package and key[1] == name: + return key + return None + def classCompletions (self, prefix): """ Provide a list of candidate class names for command completion """ self.lock.acquire () @@ -227,6 +259,10 @@ class ManagementData: return "reference" elif typecode == 11: return "boolean" + elif typecode == 12: + return "float" + elif typecode == 13: + return "double" else: raise ValueError ("Invalid type code: %d" % typecode) @@ -253,16 +289,16 @@ class ManagementData: return False return True - def listOfIds (self, className, tokens): + def listOfIds (self, classKey, tokens): """ Generate a tuple of object ids for a classname based on command tokens. """ list = [] if tokens[0] == "all": - for id in self.tables[className]: + for id in self.tables[classKey]: list.append (self.displayObjId (id)) elif tokens[0] == "active": - for id in self.tables[className]: - if self.tables[className][id][0][2] == 0: + for id in self.tables[classKey]: + if self.tables[classKey][id][0][2] == 0: list.append (self.displayObjId (id)) else: @@ -271,7 +307,7 @@ class ManagementData: if token.find ("-") != -1: ids = token.split("-", 2) for id in range (int (ids[0]), int (ids[1]) + 1): - if self.getClassForId (self.rawObjId (long (id))) == className: + if self.getClassForId (self.rawObjId (long (id))) == classKey: list.append (id) else: list.append (token) @@ -301,7 +337,7 @@ class ManagementData: deleted = deleted + 1 else: active = active + 1 - rows.append ((name, active, deleted)) + rows.append ((self.displayClassName (name), active, deleted)) self.disp.table ("Management Object Types:", ("ObjectType", "Active", "Deleted"), rows) finally: @@ -311,22 +347,23 @@ class ManagementData: """ Generate a display of a list of objects in a class """ self.lock.acquire () try: - if className not in self.tables: + classKey = self.getClassKey (className) + if classKey == None: print ("Object type %s not known" % className) else: rows = [] - sorted = self.tables[className].keys () + sorted = self.tables[classKey].keys () sorted.sort () for objId in sorted: - (ts, config, inst) = self.tables[className][objId] + (ts, config, inst) = self.tables[classKey][objId] createTime = self.disp.timestamp (ts[1]) destroyTime = "-" if ts[2] > 0: destroyTime = self.disp.timestamp (ts[2]) - objIndex = self.getObjIndex (className, config) + objIndex = self.getObjIndex (classKey, config) row = (self.refName (objId), createTime, destroyTime, objIndex) rows.append (row) - self.disp.table ("Objects of type %s" % className, + self.disp.table ("Objects of type %s.%s" % (classKey[0], classKey[1]), ("ID", "Created", "Destroyed", "Index"), rows) finally: @@ -343,57 +380,57 @@ class ManagementData: else: rootId = int (tokens[0]) - className = self.getClassForId (self.rawObjId (rootId)) + classKey = self.getClassForId (self.rawObjId (rootId)) remaining = tokens - if className == None: + if classKey == None: print "Id not known: %d" % int (tokens[0]) raise ValueError () else: - className = tokens[0] + classKey = self.getClassKey (tokens[0]) remaining = tokens[1:] - if className not in self.tables: - print "Class not known: %s" % className + if classKey not in self.tables: + print "Class not known: %s" % tokens[0] raise ValueError () - userIds = self.listOfIds (className, remaining) + userIds = self.listOfIds (classKey, remaining) if len (userIds) == 0: print "No object IDs supplied" raise ValueError () ids = [] for id in userIds: - if self.getClassForId (self.rawObjId (long (id))) == className: + if self.getClassForId (self.rawObjId (long (id))) == classKey: ids.append (self.rawObjId (long (id))) rows = [] timestamp = None - config = self.tables[className][ids[0]][1] + config = self.tables[classKey][ids[0]][1] for eIdx in range (len (config)): key = config[eIdx][0] if key != "id": row = ("config", key) for id in ids: if timestamp == None or \ - timestamp < self.tables[className][id][0][0]: - timestamp = self.tables[className][id][0][0] - (key, value) = self.tables[className][id][1][eIdx] - row = row + (self.valueDisplay (className, key, value),) + timestamp < self.tables[classKey][id][0][0]: + timestamp = self.tables[classKey][id][0][0] + (key, value) = self.tables[classKey][id][1][eIdx] + row = row + (self.valueDisplay (classKey, key, value),) rows.append (row) - inst = self.tables[className][ids[0]][2] + inst = self.tables[classKey][ids[0]][2] for eIdx in range (len (inst)): key = inst[eIdx][0] if key != "id": row = ("inst", key) for id in ids: - (key, value) = self.tables[className][id][2][eIdx] - row = row + (self.valueDisplay (className, key, value),) + (key, value) = self.tables[classKey][id][2][eIdx] + row = row + (self.valueDisplay (classKey, key, value),) rows.append (row) titleRow = ("Type", "Element") for id in ids: titleRow = titleRow + (self.refName (id),) - caption = "Object of type %s:" % className + caption = "Object of type %s.%s:" % (classKey[0], classKey[1]) if timestamp != None: caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")" self.disp.table (caption, titleRow, rows) @@ -423,12 +460,13 @@ class ManagementData: """ Generate a display of details of the schema of a particular class """ self.lock.acquire () try: - if className not in self.schema: + classKey = self.getClassKey (className) + if classKey == None: print ("Class name %s not known" % className) raise ValueError () rows = [] - for config in self.schema[className][0]: + for config in self.schema[classKey][0]: name = config[0] if name != "id": typename = self.typeName(config[1]) @@ -446,7 +484,7 @@ class ManagementData: extra = extra + "MaxLen: " + str (config[8]) rows.append ((name, typename, unit, access, extra, desc)) - for config in self.schema[className][1]: + for config in self.schema[classKey][1]: name = config[0] if name != "id": typename = self.typeName(config[1]) @@ -455,10 +493,10 @@ class ManagementData: rows.append ((name, typename, unit, "", "", desc)) titles = ("Element", "Type", "Unit", "Access", "Notes", "Description") - self.disp.table ("Schema for class '%s':" % className, titles, rows) + self.disp.table ("Schema for class '%s.%s':" % (classKey[0], classKey[1]), titles, rows) - for mname in self.schema[className][2]: - (mdesc, args) = self.schema[className][2][mname] + for mname in self.schema[classKey][2]: + (mdesc, args) = self.schema[classKey][2][mname] caption = "\nMethod '%s' %s" % (mname, self.notNone (mdesc)) rows = [] for arg in args: @@ -485,25 +523,25 @@ class ManagementData: self.lock.release () def getClassForId (self, objId): - """ Given an object ID, return the class name for the referenced object """ - for className in self.tables: - if objId in self.tables[className]: - return className + """ Given an object ID, return the class key for the referenced object """ + for classKey in self.tables: + if objId in self.tables[classKey]: + return classKey return None def callMethod (self, userOid, methodName, args): self.lock.acquire () methodOk = True try: - className = self.getClassForId (self.rawObjId (userOid)) - if className == None: + classKey = self.getClassForId (self.rawObjId (userOid)) + if classKey == None: raise ValueError () - if methodName not in self.schema[className][2]: - print "Method '%s' not valid for class '%s'" % (methodName, className) + if methodName not in self.schema[classKey][2]: + print "Method '%s' not valid for class '%s.%s'" % (methodName, classKey[0], classKey[1]) raise ValueError () - schemaMethod = self.schema[className][2][methodName] + schemaMethod = self.schema[classKey][2][methodName] if len (args) != len (schemaMethod[1]): print "Wrong number of method args: Need %d, Got %d" % (len (schemaMethod[1]), len (args)) raise ValueError () @@ -519,8 +557,8 @@ class ManagementData: self.lock.release () if methodOk: # try: - self.broker.method (self.methodSeq, self.rawObjId (userOid), className, - methodName, namedArgs) + self.mclient.callMethod (self.mch, self.methodSeq, self.rawObjId (userOid), classKey, + methodName, namedArgs) # except ValueError, e: # print "Error invoking method:", e diff --git a/qpid/python/qpid/codec.py b/qpid/python/qpid/codec.py index b25de11f11..1a9372455d 100644 --- a/qpid/python/qpid/codec.py +++ b/qpid/python/qpid/codec.py @@ -265,6 +265,38 @@ class Codec: """ return self.unpack("!Q") + def encode_float(self, o): + self.pack("!f", o) + + def decode_float(self): + return self.unpack("!f") + + def encode_double(self, o): + self.pack("!d", o) + + def decode_double(self): + return self.unpack("!d") + + def encode_bin128(self, b): + for idx in range (0,16): + self.pack("!B", ord (b[idx])) + + def decode_bin128(self): + result = "" + for idx in range (0,16): + result = result + chr (self.unpack("!B")) + return result + + def encode_raw(self, len, b): + for idx in range (0,len): + self.pack("!B", b[idx]) + + def decode_raw(self, len): + result = "" + for idx in range (0,len): + result = result + chr (self.unpack("!B")) + return result + def enc_str(self, fmt, s): """ encodes a string 's' in network byte order as per format 'fmt' diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py index 40de2a5298..b5d992cf5d 100644 --- a/qpid/python/qpid/management.py +++ b/qpid/python/qpid/management.py @@ -35,12 +35,14 @@ from threading import Lock class SequenceManager: + """ Manage sequence numbers for asynchronous method calls """ def __init__ (self): self.lock = Lock () self.sequence = 0 self.pending = {} def reserve (self, data): + """ Reserve a unique sequence number """ self.lock.acquire () result = self.sequence self.sequence = self.sequence + 1 @@ -49,6 +51,7 @@ class SequenceManager: return result def release (self, seq): + """ Release a reserved sequence number """ data = None self.lock.acquire () if seq in self.pending: @@ -57,12 +60,172 @@ class SequenceManager: self.lock.release () return data -class ManagementMetadata: - """One instance of this class is created for each ManagedBroker. It - is used to store metadata from the broker which is needed for the - proper interpretation of received management content.""" + +class managementChannel: + """ This class represents a connection to an AMQP broker. """ + + def __init__ (self, ch, topicCb, replyCb, cbContext=None): + """ Given a channel on an established AMQP broker connection, this method + opens a session and performs all of the declarations and bindings needed + to participate in the management protocol. """ + response = ch.session_open (detached_lifetime=300) + self.topicName = "mgmt-" + base64.urlsafe_b64encode (response.session_id) + self.replyName = "reply-" + base64.urlsafe_b64encode (response.session_id) + self.qpidChannel = ch + self.tcb = topicCb + self.rcb = replyCb + self.context = cbContext + + ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1) + ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1) + + ch.queue_bind (exchange="qpid.management", + queue=self.topicName, routing_key="mgmt.#") + ch.queue_bind (exchange="amq.direct", + queue=self.replyName, routing_key=self.replyName) + ch.message_subscribe (queue=self.topicName, destination="tdest") + ch.message_subscribe (queue=self.replyName, destination="rdest") + + ch.client.queue ("tdest").listen (self.topicCb) + ch.client.queue ("rdest").listen (self.replyCb) + + ch.message_flow_mode (destination="tdest", mode=1) + ch.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF) + ch.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF) + + ch.message_flow_mode (destination="rdest", mode=1) + ch.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) + ch.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) + + def topicCb (self, msg): + """ Receive messages via the topic queue on this channel. """ + self.tcb (self, msg) + + def replyCb (self, msg): + """ Receive messages via the reply queue on this channel. """ + self.rcb (self, msg) + + def send (self, exchange, msg): + self.qpidChannel.message_transfer (destination=exchange, content=msg) + + +class managementClient: + """ This class provides an API for access to management data on the AMQP + network. It implements the management protocol and manages the management + schemas as advertised by the various management agents in the network. """ + + #======================================================== + # User API - interacts with the class's user + #======================================================== + def __init__ (self, amqpSpec, ctrlCb, configCb, instCb, methodCb=None): + self.spec = amqpSpec + self.ctrlCb = ctrlCb + self.configCb = configCb + self.instCb = instCb + self.methodCb = methodCb + self.schemaCb = None + self.eventCb = None + self.channels = [] + self.seqMgr = SequenceManager () + self.schema = {} + self.packages = {} + + def schemaListener (self, schemaCb): + """ Optionally register a callback to receive details of the schema of + managed objects in the network. """ + self.schemaCb = schemaCb + + def eventListener (self, eventCb): + """ Optionally register a callback to receive events from managed objects + in the network. """ + self.eventCb = eventCb + + def addChannel (self, channel): + """ Register a new channel. """ + self.channels.append (channel) + codec = Codec (StringIO (), self.spec) + self.setHeader (codec, ord ('H')) + msg = Content (codec.stream.getvalue ()) + msg["content_type"] = "application/octet-stream" + msg["routing_key"] = "agent" + msg["reply_to"] = self.spec.struct ("reply_to") + msg["reply_to"]["exchange_name"] = "amq.direct" + msg["reply_to"]["routing_key"] = channel.replyName + channel.send ("qpid.management", msg) + + def removeChannel (self, channel): + """ Remove a previously added channel from management. """ + self.channels.remove (channel) + + def callMethod (self, channel, userSequence, objId, className, methodName, args=None): + """ Invoke a method on a managed object. """ + self.method (channel, userSequence, objId, className, methodName, args) + + #======================================================== + # Channel API - interacts with registered channel objects + #======================================================== + def topicCb (self, ch, msg): + """ Receive messages via the topic queue of a particular channel. """ + codec = Codec (StringIO (msg.content.body), self.spec) + hdr = self.checkHeader (codec) + if hdr == None: + raise ValueError ("outer header invalid"); + self.parse (ch, codec, hdr[0], hdr[1]) + msg.complete () + + def replyCb (self, ch, msg): + """ Receive messages via the reply queue of a particular channel. """ + codec = Codec (StringIO (msg.content.body), self.spec) + hdr = self.checkHeader (codec) + if hdr == None: + msg.complete () + return + + if hdr[0] == 'm': + self.handleMethodReply (ch, codec) + elif hdr[0] == 'I': + self.handleInit (ch, codec) + elif hdr[0] == 'p': + self.handlePackageInd (ch, codec) + elif hdr[0] == 'q': + self.handleClassInd (ch, codec) + else: + self.parse (ch, codec, hdr[0], hdr[1]) + msg.complete () + + #======================================================== + # Internal Functions + #======================================================== + def setHeader (self, codec, opcode, cls = 0): + """ Compose the header of a management message. """ + codec.encode_octet (ord ('A')) + codec.encode_octet (ord ('M')) + codec.encode_octet (ord ('0')) + codec.encode_octet (ord ('1')) + codec.encode_octet (opcode) + codec.encode_octet (cls) + + def checkHeader (self, codec): + """ Check the header of a management message and extract the opcode and + class. """ + octet = chr (codec.decode_octet ()) + if octet != 'A': + return None + octet = chr (codec.decode_octet ()) + if octet != 'M': + return None + octet = chr (codec.decode_octet ()) + if octet != '0': + return None + octet = chr (codec.decode_octet ()) + if octet != '1': + return None + opcode = chr (codec.decode_octet ()) + cls = chr (codec.decode_octet ()) + return (opcode, cls) def encodeValue (self, codec, value, typecode): + """ Encode, into the codec, a value based on its typecode. """ if typecode == 1: codec.encode_octet (int (value)) elif typecode == 2: @@ -85,10 +248,15 @@ class ManagementMetadata: codec.encode_longlong (long (value)) elif typecode == 11: # BOOL codec.encode_octet (int (value)) + elif typecode == 12: # FLOAT + codec.encode_float (float (value)) + elif typecode == 13: # DOUBLE + codec.encode_double (double (value)) else: raise ValueError ("Invalid type code: %d" % typecode) def decodeValue (self, codec, typecode): + """ Decode, from the codec, a value based on its typecode. """ if typecode == 1: data = codec.decode_octet () elif typecode == 2: @@ -111,17 +279,119 @@ class ManagementMetadata: data = codec.decode_longlong () elif typecode == 11: # BOOL data = codec.decode_octet () + elif typecode == 12: # FLOAT + data = codec.decode_float () + elif typecode == 13: # DOUBLE + data = codec.decode_double () else: raise ValueError ("Invalid type code: %d" % typecode) return data + + def handleMethodReply (self, ch, codec): + sequence = codec.decode_long () + status = codec.decode_long () + sText = codec.decode_shortstr () + + data = self.seqMgr.release (sequence) + if data == None: + return + + (userSequence, classId, methodName) = data + args = {} + + if status == 0: + schemaClass = self.schema[classId] + ms = schemaClass['M'] + arglist = None + for mname in ms: + (mdesc, margs) = ms[mname] + if mname == methodName: + arglist = margs + if arglist == None: + return + + for arg in arglist: + if arg[2].find("O") != -1: + args[arg[0]] = self.decodeValue (codec, arg[1]) + + if self.methodCb != None: + self.methodCb (ch.context, userSequence, status, sText, args) + + def handleInit (self, ch, codec): + len = codec.decode_short () + data = codec.decode_raw (len) + if self.ctrlCb != None: + self.ctrlCb (ch.context, len, data) + + # Send a package request + sendCodec = Codec (StringIO (), self.spec) + self.setHeader (sendCodec, ord ('P')) + smsg = Content (sendCodec.stream.getvalue ()) + smsg["content_type"] = "application/octet-stream" + smsg["routing_key"] = "agent" + smsg["reply_to"] = self.spec.struct ("reply_to") + smsg["reply_to"]["exchange_name"] = "amq.direct" + smsg["reply_to"]["routing_key"] = ch.replyName + ch.send ("qpid.management", smsg) - def parseSchema (self, cls, codec): + def handlePackageInd (self, ch, codec): + pname = codec.decode_shortstr () + if pname not in self.packages: + self.packages[pname] = {} + + # Send a class request + sendCodec = Codec (StringIO (), self.spec) + self.setHeader (sendCodec, ord ('Q')) + sendCodec.encode_shortstr (pname) + smsg = Content (sendCodec.stream.getvalue ()) + smsg["content_type"] = "application/octet-stream" + smsg["routing_key"] = "agent" + smsg["reply_to"] = self.spec.struct ("reply_to") + smsg["reply_to"]["exchange_name"] = "amq.direct" + smsg["reply_to"]["routing_key"] = ch.replyName + ch.send ("qpid.management", smsg) + + def handleClassInd (self, ch, codec): + pname = codec.decode_shortstr () + cname = codec.decode_shortstr () + hash = codec.decode_bin128 () + if pname not in self.packages: + return + + if (cname, hash) not in self.packages[pname]: + # Send a schema request + sendCodec = Codec (StringIO (), self.spec) + self.setHeader (sendCodec, ord ('S')) + sendCodec.encode_shortstr (pname) + sendCodec.encode_shortstr (cname) + sendCodec.encode_bin128 (hash) + smsg = Content (sendCodec.stream.getvalue ()) + smsg["content_type"] = "application/octet-stream" + smsg["routing_key"] = "agent" + smsg["reply_to"] = self.spec.struct ("reply_to") + smsg["reply_to"]["exchange_name"] = "amq.direct" + smsg["reply_to"]["routing_key"] = ch.replyName + ch.send ("qpid.management", smsg) + + def parseSchema (self, ch, cls, codec): + """ Parse a received schema-description message. """ + packageName = codec.decode_shortstr () className = codec.decode_shortstr () + hash = codec.decode_bin128 () configCount = codec.decode_short () instCount = codec.decode_short () methodCount = codec.decode_short () eventCount = codec.decode_short () + if packageName not in self.packages: + return + if (className, hash) in self.packages[packageName]: + return + + classKey = (packageName, className, hash) + if classKey in self.schema: + return + configs = [] insts = [] methods = {} @@ -213,25 +483,29 @@ class ManagementMetadata: args.append (arg) methods[mname] = (mdesc, args) + schemaClass = {} + schemaClass['C'] = configs + schemaClass['I'] = insts + schemaClass['M'] = methods + schemaClass['E'] = events + self.schema[classKey] = schemaClass - self.schema[(className,'C')] = configs - self.schema[(className,'I')] = insts - self.schema[(className,'M')] = methods - self.schema[(className,'E')] = events - - if self.broker.schema_cb != None: - self.broker.schema_cb[1] (self.broker.schema_cb[0], className, - configs, insts, methods, events) + if self.schemaCb != None: + self.schemaCb (ch.context, classKey, configs, insts, methods, events) - def parseContent (self, cls, codec): - if cls == 'C' and self.broker.config_cb == None: + def parseContent (self, ch, cls, codec): + """ Parse a received content message. """ + if cls == 'C' and self.configCb == None: return - if cls == 'I' and self.broker.inst_cb == None: + if cls == 'I' and self.instCb == None: return - className = codec.decode_shortstr () + packageName = codec.decode_shortstr () + className = codec.decode_shortstr () + hash = codec.decode_bin128 () + classKey = (packageName, className, hash) - if (className,cls) not in self.schema: + if classKey not in self.schema: return row = [] @@ -241,184 +515,49 @@ class ManagementMetadata: timestamps.append (codec.decode_longlong ()) # Create Time timestamps.append (codec.decode_longlong ()) # Delete Time - for element in self.schema[(className,cls)][:]: + schemaClass = self.schema[classKey] + for element in schemaClass[cls][:]: tc = element[1] name = element[0] data = self.decodeValue (codec, tc) row.append ((name, data)) - if cls == 'C': - self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps) + if cls == 'C': + self.configCb (ch.context, classKey, row, timestamps) elif cls == 'I': - self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps) - - def parse (self, codec, opcode, cls): - if opcode == 'S': - self.parseSchema (cls, codec) + self.instCb (ch.context, classKey, row, timestamps) + def parse (self, ch, codec, opcode, cls): + """ Parse a message received from the topic queue. """ + if opcode == 's': + self.parseSchema (ch, cls, codec) elif opcode == 'C': - self.parseContent (cls, codec) - + self.parseContent (ch, cls, codec) else: raise ValueError ("Unknown opcode: %c" % opcode); - def __init__ (self, broker): - self.broker = broker - self.schema = {} - - -class ManagedBroker: - """An object of this class represents a connection (over AMQP) to a - single managed broker.""" - - mExchange = "qpid.management" - dExchange = "amq.direct" - - def setHeader (self, codec, opcode, cls = 0): - codec.encode_octet (ord ('A')) - codec.encode_octet (ord ('M')) - codec.encode_octet (ord ('0')) - codec.encode_octet (ord ('1')) - codec.encode_octet (opcode) - codec.encode_octet (cls) - - def checkHeader (self, codec): - octet = chr (codec.decode_octet ()) - if octet != 'A': - return None - octet = chr (codec.decode_octet ()) - if octet != 'M': - return None - octet = chr (codec.decode_octet ()) - if octet != '0': - return None - octet = chr (codec.decode_octet ()) - if octet != '1': - return None - opcode = chr (codec.decode_octet ()) - cls = chr (codec.decode_octet ()) - return (opcode, cls) - - def publish_cb (self, msg): - codec = Codec (StringIO (msg.content.body), self.spec) - - hdr = self.checkHeader (codec) - if hdr == None: - raise ValueError ("outer header invalid"); - - self.metadata.parse (codec, hdr[0], hdr[1]) - msg.complete () - - def reply_cb (self, msg): - codec = Codec (StringIO (msg.content.body), self.spec) - hdr = self.checkHeader (codec) - if hdr == None: - msg.complete () - return - if hdr[0] != 'R': - msg.complete () - return - - sequence = codec.decode_long () - status = codec.decode_long () - sText = codec.decode_shortstr () - - data = self.sequenceManager.release (sequence) - if data == None: - msg.complete () - return - - (userSequence, className, methodName) = data - args = {} - - if status == 0: - ms = self.metadata.schema[(className,'M')] - arglist = None - for mname in ms: - (mdesc, margs) = ms[mname] - if mname == methodName: - arglist = margs - if arglist == None: - msg.complete () - return - - for arg in arglist: - if arg[2].find("O") != -1: - args[arg[0]] = self.metadata.decodeValue (codec, arg[1]) - - if self.method_cb != None: - self.method_cb[1] (self.method_cb[0], userSequence, status, sText, args) - - msg.complete () - - def __init__ (self, - host = "localhost", - port = 5672, - username = "guest", - password = "guest", - specfile = "/usr/share/amqp/amqp.0-10-preview.xml"): - - self.spec = qpid.spec.load (specfile) - self.client = None - self.channel = None - self.queue = None - self.rqueue = None - self.qname = None - self.rqname = None - self.metadata = ManagementMetadata (self) - self.sequenceManager = SequenceManager () - self.connected = 0 - self.lastConnectError = None - - # Initialize the callback records - self.status_cb = None - self.schema_cb = None - self.config_cb = None - self.inst_cb = None - self.method_cb = None - - self.host = host - self.port = port - self.username = username - self.password = password - - def statusListener (self, context, callback): - self.status_cb = (context, callback) - - def schemaListener (self, context, callback): - self.schema_cb = (context, callback) - - def configListener (self, context, callback): - self.config_cb = (context, callback) - - def methodListener (self, context, callback): - self.method_cb = (context, callback) - - def instrumentationListener (self, context, callback): - self.inst_cb = (context, callback) - - def method (self, userSequence, objId, className, - methodName, args=None, packageName="qpid"): - codec = Codec (StringIO (), self.spec); - sequence = self.sequenceManager.reserve ((userSequence, className, methodName)) + def method (self, channel, userSequence, objId, classId, methodName, args): + """ Invoke a method on an object """ + codec = Codec (StringIO (), self.spec) + sequence = self.seqMgr.reserve ((userSequence, classId, methodName)) self.setHeader (codec, ord ('M')) codec.encode_long (sequence) # Method sequence id codec.encode_longlong (objId) # ID of object - #codec.encode_shortstr (self.rqname) # name of reply queue # Encode args according to schema - if (className,'M') not in self.metadata.schema: - self.sequenceManager.release (sequence) - raise ValueError ("Unknown class name: %s" % className) + if classId not in self.schema: + self.seqMgr.release (sequence) + raise ValueError ("Unknown class name: %s" % classId) - ms = self.metadata.schema[(className,'M')] - arglist = None + schemaClass = self.schema[classId] + ms = schemaClass['M'] + arglist = None for mname in ms: (mdesc, margs) = ms[mname] if mname == methodName: arglist = margs if arglist == None: - self.sequenceManager.release (sequence) + self.seqMgr.release (sequence) raise ValueError ("Unknown method name: %s" % methodName) for arg in arglist: @@ -427,65 +566,17 @@ class ManagedBroker: if arg[0] in args: value = args[arg[0]] if value == None: - self.sequenceManager.release (sequence) + self.seqMgr.release (sequence) raise ValueError ("Missing non-defaulted argument: %s" % arg[0]) - self.metadata.encodeValue (codec, value, arg[1]) + self.encodeValue (codec, value, arg[1]) + packageName = classId[0] + className = classId[1] msg = Content (codec.stream.getvalue ()) msg["content_type"] = "application/octet-stream" - msg["routing_key"] = "method." + packageName + "." + className + "." + methodName + msg["routing_key"] = "agent.method." + packageName + "." + \ + className + "." + methodName msg["reply_to"] = self.spec.struct ("reply_to") msg["reply_to"]["exchange_name"] = "amq.direct" - msg["reply_to"]["routing_key"] = self.rqname - self.channel.message_transfer (destination="qpid.management", content=msg) - - def isConnected (self): - return connected - - def start (self): - print "Connecting to broker %s:%d" % (self.host, self.port) - - try: - self.client = Client (self.host, self.port, self.spec) - self.client.start ({"LOGIN": self.username, "PASSWORD": self.password}) - self.channel = self.client.channel (1) - response = self.channel.session_open (detached_lifetime=300) - self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id) - self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id) - - self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1) - self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1) - - self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname, - routing_key="mgmt.#") - self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname, - routing_key=self.rqname) - - self.channel.message_subscribe (queue=self.qname, destination="mdest") - self.channel.message_subscribe (queue=self.rqname, destination="rdest") - - self.queue = self.client.queue ("mdest") - self.queue.listen (self.publish_cb) - - self.channel.message_flow_mode (destination="mdest", mode=1) - self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF) - self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF) - - self.rqueue = self.client.queue ("rdest") - self.rqueue.listen (self.reply_cb) - - self.channel.message_flow_mode (destination="rdest", mode=1) - self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) - self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) - - self.connected = 1 - - except socket.error, e: - print "Socket Error:", e[1] - self.lastConnectError = e - raise - except: - raise - - def stop (self): - pass + msg["reply_to"]["routing_key"] = channel.replyName + channel.send ("qpid.management", msg) diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index eab1033805..33c41fb884 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -47,7 +47,11 @@ <class name="system"> <configElement name="sysId" index="y" type="sstr" access="RC"/> - <!-- RT config/instrumentation TBD --> + <instElement name="osName" type="sstr" desc="Operating System Name"/> + <instElement name="nodeName" type="sstr" desc="Node Name"/> + <instElement name="release" type="sstr"/> + <instElement name="version" type="sstr"/> + <instElement name="machine" type="sstr"/> </class> @@ -57,20 +61,18 @@ =============================================================== --> <class name="broker"> - <configElement name="systemRef" type="objId" access="RC" index="y" desc="System ID"/> + <configElement name="systemRef" type="objId" access="RC" index="y" desc="System ID" parentRef="y"/> <configElement name="port" type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/> <configElement name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/> <configElement name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/> <configElement name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/> <configElement name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/> - <configElement name="storeLib" type="sstr" access="RO" desc="Name of persistent storage library"/> - <configElement name="asyncStore" type="bool" access="RO" desc="Use async persistent store"/> <configElement name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/> - <configElement name="initialDiskPageSize" type="uint32" access="RO" desc="Number of disk pages allocated for storage"/> - <configElement name="initialPagesPerQueue" type="uint32" access="RO" desc="Number of disk pages allocated per queue"/> <configElement name="clusterName" type="sstr" access="RO" - desc="Name of cluster this server is a member of, zero-length for standalone server"/> + desc="Name of cluster this server is a member of"/> <configElement name="version" type="sstr" access="RO" desc="Running software version"/> + <configElement name="dataDirEnabled" type="bool" access="RO" desc="Persistent configuration storage enabled"/> + <configElement name="dataDir" type="sstr" access="RO" desc="Persistent configuration storage location"/> <method name="joinCluster"> <arg name="clusterName" dir="I" type="sstr"/> @@ -137,9 +139,7 @@ <instElement name="consumers" type="hilo32" unit="consumer" desc="Current consumers on queue"/> <instElement name="bindings" type="hilo32" unit="binding" desc="Current bindings"/> <instElement name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/> - <instElement name="messageLatencyMin" type="uint64" unit="nanosecond" desc="Minimum broker latency through this queue"/> - <instElement name="messageLatencyMax" type="uint64" unit="nanosecond" desc="Maximum broker latency through this queue"/> - <instElement name="messageLatencyAvg" type="uint64" unit="nanosecond" desc="Average broker latency through this queue"/> + <instElement name="messageLatency" type="mmaTime" unit="nanosecond" desc="Broker latency through this queue"/> <method name="purge" desc="Discard all messages on queue"/> </class> @@ -203,6 +203,9 @@ =============================================================== --> <class name="link"> + + This class represents an inter-broker connection. + <configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/> <configElement name="address" type="sstr" access="RC" index="y"/> diff --git a/qpid/specs/management-types.xml b/qpid/specs/management-types.xml index 842a18cb30..6c86be3db1 100644 --- a/qpid/specs/management-types.xml +++ b/qpid/specs/management-types.xml @@ -29,6 +29,8 @@ <type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString (#)" decode="@.getLongString (#)" accessor="direct" init='""'/> <type name="absTime" base="ABSTIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> <type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> +<type name="float" base="FLOAT" cpp="float" encode="@.putFloat (#)" decode="# = @.getFloat ()" accessor="direct" init="0."/> +<type name="double" base="DOUBLE" cpp="double" encode="@.putDouble (#)" decode="# = @.getDouble ()" accessor="direct" init="0."/> <type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" style="wm" accessor="counter" init="0"/> <type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" style="wm" accessor="counter" init="0"/> @@ -41,8 +43,9 @@ <type name="count64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="counter" init="0"/> <!-- Min/Max/Average statistics --> -<type name="mma32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" style="mma" accessor="direct" init="0"/> -<type name="mma64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/> +<type name="mma32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" style="mma" accessor="direct" init="0"/> +<type name="mma64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/> +<type name="mmaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/> <!-- Some Proposed Syntax for User-Defined Types: <enum name="enumeratedType" base="U8"> |