summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xqpid/cpp/managementgen/main.py8
-rwxr-xr-xqpid/cpp/managementgen/schema.py85
-rw-r--r--qpid/cpp/managementgen/templates/Class.cpp31
-rw-r--r--qpid/cpp/managementgen/templates/Class.h19
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h2
-rw-r--r--qpid/cpp/src/qpid/broker/System.cpp48
-rw-r--r--qpid/cpp/src/qpid/broker/System.h51
-rw-r--r--qpid/cpp/src/qpid/framing/Buffer.cpp48
-rw-r--r--qpid/cpp/src/qpid/framing/Buffer.h8
-rw-r--r--qpid/cpp/src/qpid/management/Manageable.h1
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp292
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h84
-rw-r--r--qpid/cpp/src/qpid/management/ManagementExchange.cpp23
-rw-r--r--qpid/cpp/src/qpid/management/ManagementExchange.h4
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.cpp4
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.h22
-rwxr-xr-xqpid/python/mgmt-cli/main.py34
-rw-r--r--qpid/python/mgmt-cli/managementdata.py152
-rw-r--r--qpid/python/qpid/codec.py32
-rw-r--r--qpid/python/qpid/management.py551
-rw-r--r--qpid/specs/management-schema.xml23
-rw-r--r--qpid/specs/management-types.xml7
24 files changed, 1130 insertions, 408 deletions
diff --git a/qpid/cpp/managementgen/main.py b/qpid/cpp/managementgen/main.py
index de8ce4cbe6..677c7321ae 100755
--- a/qpid/cpp/managementgen/main.py
+++ b/qpid/cpp/managementgen/main.py
@@ -28,6 +28,9 @@ usage = "usage: %prog [options] schema-document type-document template-director
parser = OptionParser (usage=usage)
parser.add_option ("-m", "--makefile", dest="makefile", metavar="FILE",
help="Makefile fragment")
+parser.add_option ("-i", "--include-prefix", dest="include_prefix", metavar="PATH",
+ default="qpid/management/",
+ help="Prefix for #include of generated headers in generated source, default: qpid/management/")
(opts, args) = parser.parse_args ()
@@ -39,8 +42,11 @@ typefile = args[1]
templatedir = args[2]
outdir = args[3]
+if opts.include_prefix == ".":
+ opts.include_prefix = None
+
gen = Generator (outdir, templatedir)
-schema = PackageSchema (typefile, schemafile)
+schema = PackageSchema (typefile, schemafile, opts)
gen.makeClassFiles ("Class.h", schema)
gen.makeClassFiles ("Class.cpp", schema)
diff --git a/qpid/cpp/managementgen/schema.py b/qpid/cpp/managementgen/schema.py
index a459db7a47..fd76ba9112 100755
--- a/qpid/cpp/managementgen/schema.py
+++ b/qpid/cpp/managementgen/schema.py
@@ -21,6 +21,7 @@
from xml.dom.minidom import parse, parseString, Node
from cStringIO import StringIO
+import md5
#=====================================================================================
#
@@ -575,15 +576,18 @@ class SchemaEvent:
def getArgCount (self):
return len (self.args)
-#=====================================================================================
-#
-#=====================================================================================
+
class SchemaClass:
- def __init__ (self, node, typespec):
+ def __init__ (self, package, node, typespec, fragments, options):
+ self.packageName = package
self.configElements = []
self.instElements = []
self.methods = []
self.events = []
+ self.options = options
+ self.md5Sum = md5.new ()
+
+ self.hash (node)
attrs = node.attributes
self.name = attrs['name'].nodeValue
@@ -607,9 +611,40 @@ class SchemaClass:
sub = SchemaEvent (self, child, typespec)
self.events.append (sub)
+ elif child.nodeName == 'group':
+ self.expandFragment (child, fragments)
+
else:
raise ValueError ("Unknown class tag '%s'" % child.nodeName)
+ def hash (self, node):
+ attrs = node.attributes
+ self.md5Sum.update (node.nodeName)
+
+ for idx in range (attrs.length):
+ self.md5Sum.update (attrs.item(idx).nodeName)
+ self.md5Sum.update (attrs.item(idx).nodeValue)
+
+ for child in node.childNodes:
+ if child.nodeType == Node.ELEMENT_NODE:
+ self.hash (child)
+
+ def expandFragment (self, node, fragments):
+ attrs = node.attributes
+ name = attrs['name'].nodeValue
+ for fragment in fragments:
+ if fragment.name == name:
+ for config in fragment.configElements:
+ self.configElements.append (config)
+ for inst in fragment.instElements:
+ self.instElements.append (inst)
+ for method in fragment.methods:
+ self.methods.append (method)
+ for event in fragment.events:
+ self.events.append (event)
+ return
+ raise ValueError ("Undefined group '%s'" % name)
+
def getName (self):
return self.name
@@ -644,13 +679,9 @@ class SchemaClass:
def genConstructorArgs (self, stream, variables):
# Constructor args are config elements with read-create access
result = ""
- first = 1
for element in self.configElements:
if element.isConstructorArg ():
- if first == 1:
- first = 0
- else:
- stream.write (", ")
+ stream.write (", ")
element.genFormalParam (stream)
def genConstructorInits (self, stream, variables):
@@ -715,8 +746,8 @@ class SchemaClass:
def genMethodArgIncludes (self, stream, variables):
for method in self.methods:
if method.getArgCount () > 0:
- stream.write ("#include \"qpid/management/Args" +\
- method.getFullName () + ".h\"\n")
+ stream.write ("#include \"" + (self.options.include_prefix or "") +\
+ "Args" + method.getFullName () + ".h\"\n")
def genMethodCount (self, stream, variables):
stream.write ("%d" % len (self.methods))
@@ -765,13 +796,16 @@ class SchemaClass:
def genNameLower (self, stream, variables):
stream.write (self.name.lower ())
+ def genNamePackageLower (self, stream, variables):
+ stream.write (self.packageName.lower ())
+
def genNameUpper (self, stream, variables):
stream.write (self.name.upper ())
def genParentArg (self, stream, variables):
for config in self.configElements:
if config.isParentRef == 1:
- stream.write (" _parent")
+ stream.write (", Manageable* _parent")
return
def genParentRefAssignment (self, stream, variables):
@@ -781,6 +815,13 @@ class SchemaClass:
" = _parent->GetManagementObject ()->getObjectId ();")
return
+ def genSchemaMD5 (self, stream, variables):
+ sum = self.md5Sum.digest ()
+ for idx in range (len (sum)):
+ if idx != 0:
+ stream.write (",")
+ stream.write (hex (ord (sum[idx])))
+
def genWriteConfig (self, stream, variables):
for config in self.configElements:
config.genWrite (stream);
@@ -790,14 +831,13 @@ class SchemaClass:
inst.genWrite (stream);
-#=====================================================================================
-#
-#=====================================================================================
+
class PackageSchema:
- def __init__ (self, typefile, schemafile):
+ def __init__ (self, typefile, schemafile, options):
- self.classes = []
- self.typespec = TypeSpec (typefile)
+ self.classes = []
+ self.fragments = []
+ self.typespec = TypeSpec (typefile)
dom = parse (schemafile)
document = dom.documentElement
@@ -810,8 +850,15 @@ class PackageSchema:
for child in children:
if child.nodeType == Node.ELEMENT_NODE:
if child.nodeName == 'class':
- cls = SchemaClass (child, self.typespec)
+ cls = SchemaClass (self.packageName, child, self.typespec,
+ self.fragments, options)
self.classes.append (cls)
+
+ elif child.nodeName == 'group':
+ cls = SchemaClass (self.packageName, child, self.typespec,
+ self.fragments, options)
+ self.fragments.append (cls)
+
else:
raise ValueError ("Unknown schema tag '%s'" % child.nodeName)
diff --git a/qpid/cpp/managementgen/templates/Class.cpp b/qpid/cpp/managementgen/templates/Class.cpp
index d87d11f767..2a3f71e262 100644
--- a/qpid/cpp/managementgen/templates/Class.cpp
+++ b/qpid/cpp/managementgen/templates/Class.cpp
@@ -31,11 +31,14 @@ using namespace qpid::sys;
using namespace qpid::framing;
using std::string;
-bool /*MGEN:Class.NameCap*/::schemaNeeded = true;
-
-/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (Manageable* _core, Manageable*/*MGEN:Class.ParentArg*/,
- /*MGEN:Class.ConstructorArgs*/) :
- ManagementObject(_core, "/*MGEN:Class.NameLower*/")
+string /*MGEN:Class.NameCap*/::packageName = string ("/*MGEN:Class.NamePackageLower*/");
+string /*MGEN:Class.NameCap*/::className = string ("/*MGEN:Class.NameLower*/");
+uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] =
+ {/*MGEN:Class.SchemaMD5*/};
+bool /*MGEN:Class.NameCap*/::firstInst = true;
+
+/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) :
+ ManagementObject(_core)
/*MGEN:Class.ConstructorInits*/
{
/*MGEN:Class.ParentRefAssignment*/
@@ -60,14 +63,26 @@ namespace {
const string DEFAULT("default");
}
+bool /*MGEN:Class.NameCap*/::firstInstance (void)
+{
+ Mutex::ScopedLock alock(accessorLock);
+ if (firstInst)
+ {
+ firstInst = false;
+ return true;
+ }
+
+ return false;
+}
+
void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
{
FieldTable ft;
- schemaNeeded = false;
-
// Schema class header:
- buf.putShortString (className); // Class Name
+ buf.putShortString (packageName); // Package Name
+ buf.putShortString (className); // Class Name
+ buf.putBin128 (md5Sum); // Schema Hash
buf.putShort (/*MGEN:Class.ConfigCount*/); // Config Element Count
buf.putShort (/*MGEN:Class.InstCount*/); // Inst Element Count
buf.putShort (/*MGEN:Class.MethodCount*/); // Method Count
diff --git a/qpid/cpp/managementgen/templates/Class.h b/qpid/cpp/managementgen/templates/Class.h
index ba6a1183e2..82fac00d47 100644
--- a/qpid/cpp/managementgen/templates/Class.h
+++ b/qpid/cpp/managementgen/templates/Class.h
@@ -33,22 +33,24 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject
{
private:
- static bool schemaNeeded;
+ static std::string packageName;
+ static std::string className;
+ static uint8_t md5Sum[16];
+ static bool firstInst;
// Configuration Elements
/*MGEN:Class.ConfigDeclarations*/
// Instrumentation Elements
/*MGEN:Class.InstDeclarations*/
// Private Methods
- std::string getObjectName (void) { return "/*MGEN:Class.NameLower*/"; }
- void writeSchema (qpid::framing::Buffer& buf);
+ static void writeSchema (qpid::framing::Buffer& buf);
void writeConfig (qpid::framing::Buffer& buf);
void writeInstrumentation (qpid::framing::Buffer& buf);
- bool getSchemaNeeded (void) { return schemaNeeded; }
- void setSchemaNeeded (void) { schemaNeeded = true; }
void doMethod (std::string methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf);
+ writeSchemaCall_t getWriteSchemaCall (void) { return writeSchema; }
+ bool firstInstance (void);
/*MGEN:Class.InstChangedStub*/
public:
@@ -56,10 +58,13 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject
typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr;
qpid::sys::Mutex accessorLock;
- /*MGEN:Class.NameCap*/ (Manageable* coreObject, Manageable* parentObject,
- /*MGEN:Class.ConstructorArgs*/);
+ /*MGEN:Class.NameCap*/ (Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/);
~/*MGEN:Class.NameCap*/ (void);
+ std::string getPackageName (void) { return packageName; }
+ std::string getClassName (void) { return className; }
+ uint8_t* getMd5Sum (void) { return md5Sum; }
+
// Method IDs
/*MGEN:Class.MethodIdDeclarations*/
// Accessor Methods
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index e3b95e045f..becccb4224 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -213,6 +213,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/SessionContext.h \
qpid/broker/SessionHandler.cpp \
qpid/broker/SemanticHandler.cpp \
+ qpid/broker/System.cpp \
qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
qpid/broker/TxAck.cpp \
@@ -331,6 +332,7 @@ nobase_include_HEADERS = \
qpid/broker/RecoveryManagerImpl.h \
qpid/broker/SemanticHandler.h \
qpid/broker/SessionManager.h \
+ qpid/broker/System.h \
qpid/broker/Timer.h \
qpid/broker/TopicExchange.h \
qpid/broker/TransactionalStore.h \
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 9bfa868d9c..8b70831cf7 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -131,13 +131,18 @@ Broker::Broker(const Broker::Options& conf) :
managementAgent = ManagementAgent::getAgent ();
managementAgent->setInterval (conf.mgmtPubInterval);
- mgmtObject = management::Broker::shared_ptr (new management::Broker (this, 0, 0, conf.port));
+ System* system = new System ();
+ systemObject = System::shared_ptr (system);
+
+ mgmtObject = management::Broker::shared_ptr (new management::Broker (this, system, conf.port));
mgmtObject->set_workerThreads (conf.workerThreads);
mgmtObject->set_maxConns (conf.maxConnections);
mgmtObject->set_connBacklog (conf.connectionBacklog);
mgmtObject->set_stagingThreshold (conf.stagingThreshold);
mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval);
mgmtObject->set_version (PACKAGE_VERSION);
+ mgmtObject->set_dataDirEnabled (dataDir.isEnabled ());
+ mgmtObject->set_dataDir (dataDir.getPath ());
managementAgent->addObject (mgmtObject, 1, 0);
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 153eabc6b3..9e5191825d 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -32,6 +32,7 @@
#include "SessionManager.h"
#include "PreviewSessionManager.h"
#include "Vhost.h"
+#include "System.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/management/Broker.h"
@@ -142,6 +143,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
management::ManagementAgent::shared_ptr managementAgent;
management::Broker::shared_ptr mgmtObject;
Vhost::shared_ptr vhostObject;
+ System::shared_ptr systemObject;
void declareStandardExchange(const std::string& name, const std::string& type);
};
diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp
new file mode 100644
index 0000000000..87d5185b97
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/System.cpp
@@ -0,0 +1,48 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "System.h"
+#include "qpid/management/ManagementAgent.h"
+#include <sys/utsname.h>
+
+using namespace qpid::broker;
+using qpid::management::ManagementAgent;
+
+System::System ()
+{
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::System::shared_ptr
+ (new management::System (this, "host"));
+ struct utsname _uname;
+ if (uname (&_uname) == 0)
+ {
+ mgmtObject->set_osName (std::string (_uname.sysname));
+ mgmtObject->set_nodeName (std::string (_uname.nodename));
+ mgmtObject->set_release (std::string (_uname.release));
+ mgmtObject->set_version (std::string (_uname.version));
+ mgmtObject->set_machine (std::string (_uname.machine));
+ }
+
+ agent->addObject (mgmtObject, 3, 0);
+ }
+}
+
diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h
new file mode 100644
index 0000000000..a1a710f2b2
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/System.h
@@ -0,0 +1,51 @@
+#ifndef _BrokerSystem_
+#define _BrokerSystem_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/management/Manageable.h"
+#include "qpid/management/System.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+class System : public management::Manageable
+{
+ private:
+
+ management::System::shared_ptr mgmtObject;
+
+ public:
+
+ typedef boost::shared_ptr<System> shared_ptr;
+
+ System ();
+
+ management::ManagementObject::shared_ptr GetManagementObject (void) const
+ { return mgmtObject; }
+
+ management::Manageable::status_t ManagementMethod (uint32_t, management::Args&)
+ { return management::Manageable::STATUS_OK; }
+};
+
+}}
+
+#endif /*!_BrokerSystem_*/
diff --git a/qpid/cpp/src/qpid/framing/Buffer.cpp b/qpid/cpp/src/qpid/framing/Buffer.cpp
index 7eadf377b9..c0cd210042 100644
--- a/qpid/cpp/src/qpid/framing/Buffer.cpp
+++ b/qpid/cpp/src/qpid/framing/Buffer.cpp
@@ -74,6 +74,31 @@ void Buffer::putLongLong(uint64_t i){
putLong(lo);
}
+void Buffer::putFloat(float f){
+ union {
+ uint32_t i;
+ float f;
+ } val;
+
+ val.f = f;
+ putLong (val.i);
+}
+
+void Buffer::putDouble(double f){
+ union {
+ uint64_t i;
+ double f;
+ } val;
+
+ val.f = f;
+ putLongLong (val.i);
+}
+
+void Buffer::putBin128(uint8_t* b){
+ memcpy (data + position, b, 16);
+ position += 16;
+}
+
uint8_t Buffer::getOctet(){
return (uint8_t) data[position++];
}
@@ -104,6 +129,24 @@ uint64_t Buffer::getLongLong(){
return hi | lo;
}
+float Buffer::getFloat(){
+ union {
+ uint32_t i;
+ float f;
+ } val;
+ val.i = getLong();
+ return val.f;
+}
+
+double Buffer::getDouble(){
+ union {
+ uint64_t i;
+ double f;
+ } val;
+ val.i = getLongLong();
+ return val.f;
+}
+
template <>
uint64_t Buffer::getUInt<1>() {
return getOctet();
@@ -172,6 +215,11 @@ void Buffer::getLongString(string& s){
position += len;
}
+void Buffer::getBin128(uint8_t* b){
+ memcpy (b, data + position, 16);
+ position += 16;
+}
+
void Buffer::putRawData(const string& s){
uint32_t len = s.length();
s.copy(data + position, len);
diff --git a/qpid/cpp/src/qpid/framing/Buffer.h b/qpid/cpp/src/qpid/framing/Buffer.h
index 5ab897d351..d0ca41f82b 100644
--- a/qpid/cpp/src/qpid/framing/Buffer.h
+++ b/qpid/cpp/src/qpid/framing/Buffer.h
@@ -57,11 +57,16 @@ public:
void putShort(uint16_t i);
void putLong(uint32_t i);
void putLongLong(uint64_t i);
+ void putFloat(float f);
+ void putDouble(double f);
+ void putBin128(uint8_t* b);
- uint8_t getOctet();
+ uint8_t getOctet();
uint16_t getShort();
uint32_t getLong();
uint64_t getLongLong();
+ float getFloat();
+ double getDouble();
template <int n>
uint64_t getUInt();
@@ -73,6 +78,7 @@ public:
void putLongString(const string& s);
void getShortString(string& s);
void getLongString(string& s);
+ void getBin128(uint8_t* b);
void putRawData(const string& s);
void getRawData(string& s, uint32_t size);
diff --git a/qpid/cpp/src/qpid/management/Manageable.h b/qpid/cpp/src/qpid/management/Manageable.h
index 155b71da54..1fb890c8c6 100644
--- a/qpid/cpp/src/qpid/management/Manageable.h
+++ b/qpid/cpp/src/qpid/management/Manageable.h
@@ -23,6 +23,7 @@
#include "ManagementObject.h"
#include "Args.h"
#include <string>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace management {
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 709f2a0ecd..bdbabbaf47 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -24,13 +24,13 @@
#include "qpid/log/Statement.h"
#include <qpid/broker/Message.h>
#include <qpid/broker/MessageDelivery.h>
-#include <qpid/framing/AMQFrame.h>
#include <list>
using namespace qpid::framing;
using namespace qpid::management;
using namespace qpid::broker;
using namespace qpid::sys;
+using namespace std;
ManagementAgent::shared_ptr ManagementAgent::agent;
bool ManagementAgent::enabled = 0;
@@ -39,6 +39,7 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
{
timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
+ nextRemotePrefix = 101;
}
ManagementAgent::~ManagementAgent () {}
@@ -87,6 +88,15 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object,
object->setObjectId (objectId);
managementObjects[objectId] = object;
+
+ // If we've already seen instances of this object type, we're done.
+ if (!object->firstInstance ())
+ return;
+
+ // This is the first object of this type that we've seen, update the schema
+ // inventory.
+ PackageMap::iterator pIter = FindOrAddPackage (object->getPackageName ());
+ AddClassLocal (pIter, object);
}
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
@@ -102,15 +112,12 @@ void ManagementAgent::Periodic::fire ()
void ManagementAgent::clientAdded (void)
{
- RWlock::ScopedRlock readLock (userLock);
-
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
iter != managementObjects.end ();
iter++)
{
ManagementObject::shared_ptr object = iter->second;
- object->setAllChanged ();
- object->setSchemaNeeded ();
+ object->setAllChanged ();
}
}
@@ -142,6 +149,9 @@ void ManagementAgent::SendBuffer (Buffer& buf,
broker::Exchange::shared_ptr exchange,
string routingKey)
{
+ if (exchange.get() == 0)
+ return;
+
intrusive_ptr<Message> msg (new Message ());
AMQFrame method (in_place<MessageTransferBody>(
ProtocolVersion(), 0, exchange->getName (), 0, 0));
@@ -170,7 +180,6 @@ void ManagementAgent::SendBuffer (Buffer& buf,
void ManagementAgent::PeriodicProcessing (void)
{
#define BUFSIZE 65536
-#define THRESHOLD 16384
RWlock::ScopedWlock writeLock (userLock);
char msgChars[BUFSIZE];
uint32_t contentSize;
@@ -186,18 +195,6 @@ void ManagementAgent::PeriodicProcessing (void)
{
ManagementObject::shared_ptr object = iter->second;
- if (object->getSchemaNeeded ())
- {
- Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'S');
- object->writeSchema (msgBuffer);
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt.schema." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
- }
-
if (object->getConfigChanged () || object->isDeleted ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
@@ -239,17 +236,30 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
const string& routingKey,
const FieldTable* /*args*/)
{
- size_t pos, start;
+ RWlock::ScopedRlock readLock (userLock);
Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
- uint32_t contentSize;
- if (routingKey.compare (0, 7, "method.") != 0)
+ if (routingKey.compare (0, 13, "agent.method.") == 0)
+ dispatchMethod (msg, routingKey, 13);
+
+ else if (routingKey.length () == 5 &&
+ routingKey.compare (0, 5, "agent") == 0)
+ dispatchAgentCommand (msg);
+
+ else
{
QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey);
return;
}
+}
+
+void ManagementAgent::dispatchMethod (Message& msg,
+ const string& routingKey,
+ size_t first)
+{
+ size_t pos, start = first;
+ uint32_t contentSize;
- start = 7;
if (routingKey.length () == start)
{
QPID_LOG (debug, "Missing package-name in routing key: " << routingKey);
@@ -279,13 +289,11 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
string methodName = routingKey.substr (start, routingKey.length () - start);
contentSize = msg.encodedContentSize ();
- if (contentSize < 8 || contentSize > 65536)
+ if (contentSize < 8 || contentSize > MA_BUFFER_SIZE)
return;
- char *inMem = new char[contentSize];
- char outMem[4096]; // TODO Fix This
- Buffer inBuffer (inMem, contentSize);
- Buffer outBuffer (outMem, 4096);
+ Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
uint8_t opcode, unused;
@@ -321,7 +329,7 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
return;
}
- EncodeHeader (outBuffer, 'R');
+ EncodeHeader (outBuffer, 'm');
outBuffer.putLong (methodId);
ManagementObjectMap::iterator iter = managementObjects.find (objId);
@@ -335,9 +343,233 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
iter->second->doMethod (methodName, inBuffer, outBuffer);
}
- outLen = 4096 - outBuffer.available ();
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+}
+
+void ManagementAgent::handleHello (Buffer&, string replyToKey)
+{
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ uint8_t* dat = (uint8_t*) "Broker ID";
+ EncodeHeader (outBuffer, 'I');
+ outBuffer.putShort (9);
+ outBuffer.putRawData (dat, 9);
+
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+}
+
+void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey)
+{
+ for (PackageMap::iterator pIter = packages.begin ();
+ pIter != packages.end ();
+ pIter++)
+ {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'p');
+ EncodePackageIndication (outBuffer, pIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ }
+}
+
+void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/)
+{
+ std::string packageName;
+
+ inBuffer.getShortString (packageName);
+ FindOrAddPackage (packageName);
+}
+
+void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey)
+{
+ std::string packageName;
+
+ inBuffer.getShortString (packageName);
+ PackageMap::iterator pIter = packages.find (packageName);
+ if (pIter != packages.end ())
+ {
+ ClassMap cMap = pIter->second;
+ for (ClassMap::iterator cIter = cMap.begin ();
+ cIter != cMap.end ();
+ cIter++)
+ {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'q');
+ EncodeClassIndication (outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ }
+ }
+}
+
+void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey)
+{
+ string packageName;
+ SchemaClassKey key;
+
+ inBuffer.getShortString (packageName);
+ inBuffer.getShortString (key.name);
+ inBuffer.getBin128 (key.hash);
+
+ PackageMap::iterator pIter = packages.find (packageName);
+ if (pIter != packages.end ())
+ {
+ ClassMap cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find (key);
+ if (cIter != cMap.end ())
+ {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+ SchemaClass classInfo = cIter->second;
+
+ if (classInfo.writeSchemaCall != 0)
+ {
+ EncodeHeader (outBuffer, 's');
+ classInfo.writeSchemaCall (outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ }
+ else
+ {
+ // TODO: Forward request to remote agent.
+ }
+
+ clientAdded ();
+ // TODO: Send client-added to each remote agent.
+ }
+ }
+}
+
+uint32_t ManagementAgent::assignPrefix (uint32_t /*requestedPrefix*/)
+{
+ // TODO: Allow remote agents to keep their requested prefixes if able.
+ return nextRemotePrefix++;
+}
+
+void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey)
+{
+ string label;
+ uint32_t requestedPrefix;
+ uint32_t assignedPrefix;
+
+ inBuffer.getShortString (label);
+ requestedPrefix = inBuffer.getLong ();
+ assignedPrefix = assignPrefix (requestedPrefix);
+
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'a');
+ outBuffer.putLong (assignedPrefix);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- free (inMem);
+}
+
+void ManagementAgent::dispatchAgentCommand (Message& msg)
+{
+ Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
+ uint8_t opcode, unused;
+ string replyToKey;
+
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ if (p && p->hasReplyTo())
+ {
+ const framing::ReplyTo& rt = p->getReplyTo ();
+ replyToKey = rt.getRoutingKey ();
+ }
+ else
+ return;
+
+ msg.encodeContent (inBuffer);
+ inBuffer.reset ();
+
+ if (!CheckHeader (inBuffer, &opcode, &unused))
+ return;
+
+ if (opcode == 'H') handleHello (inBuffer, replyToKey);
+ else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey);
+ else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey);
+ else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey);
+ else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey);
+ else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey);
+}
+
+ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::string name)
+{
+ PackageMap::iterator pIter = packages.find (name);
+ if (pIter != packages.end ())
+ return pIter;
+
+ // No such package found, create a new map entry.
+ pair<PackageMap::iterator, bool> result =
+ packages.insert (pair<string, ClassMap> (name, ClassMap ()));
+ QPID_LOG (debug, "ManagementAgent added package " << name);
+
+ // Publish a package-indication message
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'p');
+ EncodePackageIndication (outBuffer, result.first);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, mExchange, "mgmt.schema.package");
+
+ return result.first;
+}
+
+void ManagementAgent::AddClassLocal (PackageMap::iterator pIter,
+ ManagementObject::shared_ptr object)
+{
+ SchemaClassKey key;
+ ClassMap& cMap = pIter->second;
+
+ key.name = object->getClassName ();
+ memcpy (&key.hash, object->getMd5Sum (), 16);
+
+ ClassMap::iterator cIter = cMap.find (key);
+ if (cIter != cMap.end ())
+ return;
+
+ // No such class found, create a new class with local information.
+ QPID_LOG (debug, "ManagementAgent added class " << pIter->first << "." <<
+ key.name);
+ SchemaClass classInfo;
+
+ classInfo.writeSchemaCall = object->getWriteSchemaCall ();
+ cMap[key] = classInfo;
+
+ // TODO: Publish a class-indication message
+}
+
+void ManagementAgent::EncodePackageIndication (Buffer& buf,
+ PackageMap::iterator pIter)
+{
+ buf.putShortString ((*pIter).first);
+}
+
+void ManagementAgent::EncodeClassIndication (Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter)
+{
+ SchemaClassKey key = (*cIter).first;
+
+ buf.putShortString ((*pIter).first);
+ buf.putShortString (key.name);
+ buf.putBin128 (key.hash);
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index 30b8857c27..2acbe124bd 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -27,6 +27,7 @@
#include "qpid/broker/Timer.h"
#include "qpid/sys/Mutex.h"
#include "ManagementObject.h"
+#include <qpid/framing/AMQFrame.h>
#include <boost/shared_ptr.hpp>
namespace qpid {
@@ -70,16 +71,76 @@ class ManagementAgent
void fire ();
};
+ // Storage for tracking remote management agents, attached via the client
+ // management agent API.
+ //
+ struct RemoteAgent
+ {
+ std::string name;
+ uint64_t objIdBase;
+ };
+
+ // TODO: Eventually replace string with entire reply-to structure. reply-to
+ // currently assumes that the exchange is "amq.direct" even though it could
+ // in theory be specified differently.
+ typedef std::map<std::string, RemoteAgent> RemoteAgentMap;
+ typedef std::vector<std::string> ReplyToVector;
+
+ // Storage for known schema classes:
+ //
+ // SchemaClassKey -- Key elements for map lookups
+ // SchemaClassKeyComp -- Comparison class for SchemaClassKey
+ // SchemaClass -- Non-key elements for classes
+ //
+ struct SchemaClassKey
+ {
+ std::string name;
+ uint8_t hash[16];
+ };
+
+ struct SchemaClassKeyComp
+ {
+ bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
+ {
+ if (lhs.name != rhs.name)
+ return lhs.name < rhs.name;
+ else
+ for (int i = 0; i < 16; i++)
+ if (lhs.hash[i] != rhs.hash[i])
+ return lhs.hash[i] < rhs.hash[i];
+ return false;
+ }
+ };
+
+ struct SchemaClass
+ {
+ ManagementObject::writeSchemaCall_t writeSchemaCall;
+ ReplyToVector remoteAgents;
+
+ SchemaClass () : writeSchemaCall(0) {}
+ };
+
+ typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
+ typedef std::map<std::string, ClassMap> PackageMap;
+
+ RemoteAgentMap remoteAgents;
+ PackageMap packages;
+ ManagementObjectMap managementObjects;
+
static shared_ptr agent;
static bool enabled;
qpid::sys::RWlock userLock;
- ManagementObjectMap managementObjects;
broker::Timer timer;
broker::Exchange::shared_ptr mExchange;
broker::Exchange::shared_ptr dExchange;
uint16_t interval;
uint64_t nextObjectId;
+ uint32_t nextRemotePrefix;
+
+# define MA_BUFFER_SIZE 65536
+ char inputBuffer[MA_BUFFER_SIZE];
+ char outputBuffer[MA_BUFFER_SIZE];
void PeriodicProcessing (void);
void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint8_t cls = 0);
@@ -88,6 +149,27 @@ class ManagementAgent
uint32_t length,
broker::Exchange::shared_ptr exchange,
std::string routingKey);
+
+ void dispatchMethod (broker::Message& msg,
+ const std::string& routingKey,
+ size_t first);
+ void dispatchAgentCommand (broker::Message& msg);
+
+ PackageMap::iterator FindOrAddPackage (std::string name);
+ void AddClassLocal (PackageMap::iterator pIter,
+ ManagementObject::shared_ptr object);
+ void EncodePackageIndication (qpid::framing::Buffer& buf,
+ PackageMap::iterator pIter);
+ void EncodeClassIndication (qpid::framing::Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter);
+ uint32_t assignPrefix (uint32_t requestedPrefix);
+ void handleHello (qpid::framing::Buffer& inBuffer, std::string replyToKey);
+ void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey);
+ void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey);
+ void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey);
+ void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey);
+ void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey);
};
}}
diff --git a/qpid/cpp/src/qpid/management/ManagementExchange.cpp b/qpid/cpp/src/qpid/management/ManagementExchange.cpp
index ee18f026e7..c589aefba0 100644
--- a/qpid/cpp/src/qpid/management/ManagementExchange.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementExchange.cpp
@@ -36,28 +36,15 @@ ManagementExchange::ManagementExchange (const std::string& _name,
Exchange (_name, _durable, _args, _parent),
TopicExchange(_name, _durable, _args, _parent) {}
-
-bool ManagementExchange::bind (Queue::shared_ptr queue,
- const string& routingKey,
- const FieldTable* args)
-{
- bool result = TopicExchange::bind (queue, routingKey, args);
-
- // Notify the management agent that a new management client has bound to the
- // exchange.
- if (result)
- managementAgent->clientAdded ();
-
- return result;
-}
-
void ManagementExchange::route (Deliverable& msg,
const string& routingKey,
const FieldTable* args)
{
- // Intercept management commands
- if (routingKey.length () > 7 &&
- routingKey.substr (0, 7).compare ("method.") == 0)
+ // Intercept management agent commands
+ if ((routingKey.length () > 6 &&
+ routingKey.substr (0, 6).compare ("agent.") == 0) ||
+ (routingKey.length () == 5 &&
+ routingKey.substr (0, 5).compare ("agent") == 0))
{
managementAgent->dispatchCommand (msg, routingKey, args);
return;
diff --git a/qpid/cpp/src/qpid/management/ManagementExchange.h b/qpid/cpp/src/qpid/management/ManagementExchange.h
index 1a79482c9d..7faec32b0f 100644
--- a/qpid/cpp/src/qpid/management/ManagementExchange.h
+++ b/qpid/cpp/src/qpid/management/ManagementExchange.h
@@ -42,10 +42,6 @@ class ManagementExchange : public virtual TopicExchange
virtual std::string getType() const { return typeName; }
- virtual bool bind (Queue::shared_ptr queue,
- const string& routingKey,
- const qpid::framing::FieldTable* args);
-
virtual void route (Deliverable& msg,
const string& routingKey,
const qpid::framing::FieldTable* args);
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp
index c2d1f56be0..6af5412b99 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp
@@ -29,7 +29,9 @@ using namespace qpid::sys;
void ManagementObject::writeTimestamps (Buffer& buf)
{
- buf.putShortString (className);
+ buf.putShortString (getPackageName ());
+ buf.putShortString (getClassName ());
+ buf.putBin128 (getMd5Sum ());
buf.putLongLong (uint64_t (Duration (now ())));
buf.putLongLong (createTime);
buf.putLongLong (destroyTime);
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h
index a32055721d..87c3ccf22a 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.h
+++ b/qpid/cpp/src/qpid/management/ManagementObject.h
@@ -44,8 +44,7 @@ class ManagementObject
bool instChanged;
bool deleted;
Manageable* coreObject;
- std::string className;
-
+
static const uint8_t TYPE_U8 = 1;
static const uint8_t TYPE_U16 = 2;
static const uint8_t TYPE_U32 = 3;
@@ -56,6 +55,8 @@ class ManagementObject
static const uint8_t TYPE_DELTATIME = 9;
static const uint8_t TYPE_REF = 10;
static const uint8_t TYPE_BOOL = 11;
+ static const uint8_t TYPE_FLOAT = 12;
+ static const uint8_t TYPE_DOUBLE = 13;
static const uint8_t ACCESS_RC = 1;
static const uint8_t ACCESS_RW = 2;
@@ -73,23 +74,26 @@ class ManagementObject
public:
typedef boost::shared_ptr<ManagementObject> shared_ptr;
+ typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
- ManagementObject (Manageable* _core, std::string _name) :
+ ManagementObject (Manageable* _core) :
destroyTime(0), objectId (0), configChanged(true),
- instChanged(true), deleted(false), coreObject(_core), className(_name)
+ instChanged(true), deleted(false), coreObject(_core)
{ createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); }
virtual ~ManagementObject () {}
- virtual void writeSchema (qpid::framing::Buffer& buf) = 0;
+ virtual writeSchemaCall_t getWriteSchemaCall (void) = 0;
+ virtual bool firstInstance (void) = 0;
virtual void writeConfig (qpid::framing::Buffer& buf) = 0;
virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0;
- virtual bool getSchemaNeeded (void) = 0;
- virtual void setSchemaNeeded (void) = 0;
virtual void doMethod (std::string methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf) = 0;
- std::string getClassName (void) { return className; }
+ virtual std::string getClassName (void) = 0;
+ virtual std::string getPackageName (void) = 0;
+ virtual uint8_t* getMd5Sum (void) = 0;
+
void setObjectId (uint64_t oid) { objectId = oid; }
uint64_t getObjectId (void) { return objectId; }
inline bool getConfigChanged (void) { return configChanged; }
@@ -108,7 +112,7 @@ class ManagementObject
};
- typedef std::map<uint64_t,ManagementObject::shared_ptr> ManagementObjectMap;
+typedef std::map<uint64_t,ManagementObject::shared_ptr> ManagementObjectMap;
}}
diff --git a/qpid/python/mgmt-cli/main.py b/qpid/python/mgmt-cli/main.py
index 76e1f25c14..f4c22012eb 100755
--- a/qpid/python/mgmt-cli/main.py
+++ b/qpid/python/mgmt-cli/main.py
@@ -104,7 +104,10 @@ class Mcli (Cmd):
self.dataObject.do_list (data)
def do_call (self, data):
- self.dataObject.do_call (data)
+ try:
+ self.dataObject.do_call (data)
+ except ValueError, e:
+ print "ValueError:", e
def do_EOF (self, data):
print "quit"
@@ -121,7 +124,10 @@ class Mcli (Cmd):
self.dataObject.close ()
def Usage ():
- print sys.argv[0], "[<target-host> [<tcp-port>]]"
+ print "Usage:", sys.argv[0], "[OPTIONS] [<target-host> [<tcp-port>]]"
+ print
+ print "Options:"
+ print " -s <amqp-spec-file> default: /usr/share/amqp/amqp.0-10-preview.xml"
print
sys.exit (1)
@@ -134,13 +140,15 @@ try:
(optlist, cargs) = getopt.getopt (sys.argv[1:], 's:')
except:
Usage ()
+ exit (1)
specpath = "/usr/share/amqp/amqp.0-10-preview.xml"
host = "localhost"
port = 5672
-if "s" in optlist:
- specpath = optlist["s"]
+for opt in optlist:
+ if opt[0] == "-s":
+ specpath = opt[1]
if len (cargs) > 0:
host = cargs[0]
@@ -148,19 +156,27 @@ if len (cargs) > 0:
if len (cargs) > 1:
port = int (cargs[1])
-print ("Management Tool for QPID")
disp = Display ()
# Attempt to make a connection to the target broker
try:
- data = ManagementData (disp, host, port, spec=specpath)
+ data = ManagementData (disp, host, port, specfile=specpath)
except socket.error, e:
- sys.exit (0)
+ print "Socket Error:", e[1]
+ sys.exit (1)
except Closed, e:
if str(e).find ("Exchange not found") != -1:
print "Management not enabled on broker: Use '-m yes' option on broker startup."
- sys.exit (0)
+ sys.exit (1)
+except IOError, e:
+ print "IOError: %d - %s: %s" % (e.errno, e.strerror, e.filename)
+ sys.exit (1)
# Instantiate the CLI interpreter and launch it.
cli = Mcli (data, disp)
-cli.cmdloop ()
+print ("Management Tool for QPID")
+try:
+ cli.cmdloop ()
+except Closed, e:
+ print "Connection to Broker Lost:", e
+ exit (1)
diff --git a/qpid/python/mgmt-cli/managementdata.py b/qpid/python/mgmt-cli/managementdata.py
index e7233c98ae..5b13594994 100644
--- a/qpid/python/mgmt-cli/managementdata.py
+++ b/qpid/python/mgmt-cli/managementdata.py
@@ -19,10 +19,12 @@
# under the License.
#
-from qpid.management import ManagedBroker
+import qpid
+from qpid.management import managementChannel, managementClient
from threading import Lock
from disp import Display
from shlex import split
+from qpid.client import Client
class ManagementData:
@@ -35,9 +37,10 @@ class ManagementData:
# The only historical data it keeps are the high and low watermarks
# for hi-lo statistics.
#
- # tables :== {<class-name>}
+ # tables :== {class-key}
# {<obj-id>}
# (timestamp, config-record, inst-record)
+ # class-key :== (<package-name>, <class-name>, <class-hash>)
# timestamp :== (<last-interval-time>, <create-time>, <delete-time>)
# config-record :== [element]
# inst-record :== [element]
@@ -59,6 +62,10 @@ class ManagementData:
return displayId + self.baseId
return displayId - 5000 + 0x8000000000000000L
+ def displayClassName (self, cls):
+ (packageName, className, hash) = cls
+ return packageName + "." + className
+
def dataHandler (self, context, className, list, timestamps):
""" Callback for configuration and instrumentation data updates """
self.lock.acquire ()
@@ -104,6 +111,12 @@ class ManagementData:
finally:
self.lock.release ()
+ def configHandler (self, context, className, list, timestamps):
+ self.dataHandler (0, className, list, timestamps);
+
+ def instHandler (self, context, className, list, timestamps):
+ self.dataHandler (1, className, list, timestamps);
+
def methodReply (self, broker, sequence, status, sText, args):
""" Callback for method-reply messages """
self.lock.acquire ()
@@ -121,12 +134,8 @@ class ManagementData:
self.schema[className] = (configs, insts, methods, events)
def __init__ (self, disp, host, port=5672, username="guest", password="guest",
- spec="../../specs/amqp.0-10-preview.xml"):
- self.broker = ManagedBroker (host, port, username, password, spec)
- self.broker.configListener (0, self.dataHandler)
- self.broker.instrumentationListener (1, self.dataHandler)
- self.broker.methodListener (None, self.methodReply)
- self.broker.schemaListener (None, self.schemaHandler)
+ specfile="../../specs/amqp.0-10-preview.xml"):
+ self.spec = qpid.spec.load (specfile)
self.lock = Lock ()
self.tables = {}
self.schema = {}
@@ -135,24 +144,33 @@ class ManagementData:
self.lastUnit = None
self.methodSeq = 1
self.methodsPending = {}
- self.broker.start ()
+
+ self.client = Client (host, port, self.spec)
+ self.client.start ({"LOGIN": username, "PASSWORD": password})
+ self.channel = self.client.channel (1)
+
+ self.mclient = managementClient (self.spec, None, self.configHandler,
+ self.instHandler, self.methodReply)
+ self.mclient.schemaListener (self.schemaHandler)
+ self.mch = managementChannel (self.channel, self.mclient.topicCb, self.mclient.replyCb)
+ self.mclient.addChannel (self.mch)
def close (self):
- self.broker.stop ()
+ self.mclient.removeChannel (self.mch)
def refName (self, oid):
if oid == 0:
return "NULL"
return str (self.displayObjId (oid))
- def valueDisplay (self, className, key, value):
+ def valueDisplay (self, classKey, key, value):
for kind in range (2):
- schema = self.schema[className][kind]
+ schema = self.schema[classKey][kind]
for item in schema:
if item[0] == key:
typecode = item[1]
unit = item[2]
- if typecode >= 1 and typecode <= 5: # numerics
+ if (typecode >= 1 and typecode <= 5) or typecode >= 12: # numerics
if unit == None or unit == self.lastUnit:
return str (value)
else:
@@ -191,6 +209,20 @@ class ManagementData:
result = result + self.valueDisplay (className, key, val)
return result
+ def getClassKey (self, className):
+ dotPos = className.find(".")
+ if dotPos == -1:
+ for key in self.schema:
+ if key[1] == className:
+ return key
+ else:
+ package = className[0:dotPos]
+ name = className[dotPos + 1:]
+ for key in self.schema:
+ if key[0] == package and key[1] == name:
+ return key
+ return None
+
def classCompletions (self, prefix):
""" Provide a list of candidate class names for command completion """
self.lock.acquire ()
@@ -227,6 +259,10 @@ class ManagementData:
return "reference"
elif typecode == 11:
return "boolean"
+ elif typecode == 12:
+ return "float"
+ elif typecode == 13:
+ return "double"
else:
raise ValueError ("Invalid type code: %d" % typecode)
@@ -253,16 +289,16 @@ class ManagementData:
return False
return True
- def listOfIds (self, className, tokens):
+ def listOfIds (self, classKey, tokens):
""" Generate a tuple of object ids for a classname based on command tokens. """
list = []
if tokens[0] == "all":
- for id in self.tables[className]:
+ for id in self.tables[classKey]:
list.append (self.displayObjId (id))
elif tokens[0] == "active":
- for id in self.tables[className]:
- if self.tables[className][id][0][2] == 0:
+ for id in self.tables[classKey]:
+ if self.tables[classKey][id][0][2] == 0:
list.append (self.displayObjId (id))
else:
@@ -271,7 +307,7 @@ class ManagementData:
if token.find ("-") != -1:
ids = token.split("-", 2)
for id in range (int (ids[0]), int (ids[1]) + 1):
- if self.getClassForId (self.rawObjId (long (id))) == className:
+ if self.getClassForId (self.rawObjId (long (id))) == classKey:
list.append (id)
else:
list.append (token)
@@ -301,7 +337,7 @@ class ManagementData:
deleted = deleted + 1
else:
active = active + 1
- rows.append ((name, active, deleted))
+ rows.append ((self.displayClassName (name), active, deleted))
self.disp.table ("Management Object Types:",
("ObjectType", "Active", "Deleted"), rows)
finally:
@@ -311,22 +347,23 @@ class ManagementData:
""" Generate a display of a list of objects in a class """
self.lock.acquire ()
try:
- if className not in self.tables:
+ classKey = self.getClassKey (className)
+ if classKey == None:
print ("Object type %s not known" % className)
else:
rows = []
- sorted = self.tables[className].keys ()
+ sorted = self.tables[classKey].keys ()
sorted.sort ()
for objId in sorted:
- (ts, config, inst) = self.tables[className][objId]
+ (ts, config, inst) = self.tables[classKey][objId]
createTime = self.disp.timestamp (ts[1])
destroyTime = "-"
if ts[2] > 0:
destroyTime = self.disp.timestamp (ts[2])
- objIndex = self.getObjIndex (className, config)
+ objIndex = self.getObjIndex (classKey, config)
row = (self.refName (objId), createTime, destroyTime, objIndex)
rows.append (row)
- self.disp.table ("Objects of type %s" % className,
+ self.disp.table ("Objects of type %s.%s" % (classKey[0], classKey[1]),
("ID", "Created", "Destroyed", "Index"),
rows)
finally:
@@ -343,57 +380,57 @@ class ManagementData:
else:
rootId = int (tokens[0])
- className = self.getClassForId (self.rawObjId (rootId))
+ classKey = self.getClassForId (self.rawObjId (rootId))
remaining = tokens
- if className == None:
+ if classKey == None:
print "Id not known: %d" % int (tokens[0])
raise ValueError ()
else:
- className = tokens[0]
+ classKey = self.getClassKey (tokens[0])
remaining = tokens[1:]
- if className not in self.tables:
- print "Class not known: %s" % className
+ if classKey not in self.tables:
+ print "Class not known: %s" % tokens[0]
raise ValueError ()
- userIds = self.listOfIds (className, remaining)
+ userIds = self.listOfIds (classKey, remaining)
if len (userIds) == 0:
print "No object IDs supplied"
raise ValueError ()
ids = []
for id in userIds:
- if self.getClassForId (self.rawObjId (long (id))) == className:
+ if self.getClassForId (self.rawObjId (long (id))) == classKey:
ids.append (self.rawObjId (long (id)))
rows = []
timestamp = None
- config = self.tables[className][ids[0]][1]
+ config = self.tables[classKey][ids[0]][1]
for eIdx in range (len (config)):
key = config[eIdx][0]
if key != "id":
row = ("config", key)
for id in ids:
if timestamp == None or \
- timestamp < self.tables[className][id][0][0]:
- timestamp = self.tables[className][id][0][0]
- (key, value) = self.tables[className][id][1][eIdx]
- row = row + (self.valueDisplay (className, key, value),)
+ timestamp < self.tables[classKey][id][0][0]:
+ timestamp = self.tables[classKey][id][0][0]
+ (key, value) = self.tables[classKey][id][1][eIdx]
+ row = row + (self.valueDisplay (classKey, key, value),)
rows.append (row)
- inst = self.tables[className][ids[0]][2]
+ inst = self.tables[classKey][ids[0]][2]
for eIdx in range (len (inst)):
key = inst[eIdx][0]
if key != "id":
row = ("inst", key)
for id in ids:
- (key, value) = self.tables[className][id][2][eIdx]
- row = row + (self.valueDisplay (className, key, value),)
+ (key, value) = self.tables[classKey][id][2][eIdx]
+ row = row + (self.valueDisplay (classKey, key, value),)
rows.append (row)
titleRow = ("Type", "Element")
for id in ids:
titleRow = titleRow + (self.refName (id),)
- caption = "Object of type %s:" % className
+ caption = "Object of type %s.%s:" % (classKey[0], classKey[1])
if timestamp != None:
caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")"
self.disp.table (caption, titleRow, rows)
@@ -423,12 +460,13 @@ class ManagementData:
""" Generate a display of details of the schema of a particular class """
self.lock.acquire ()
try:
- if className not in self.schema:
+ classKey = self.getClassKey (className)
+ if classKey == None:
print ("Class name %s not known" % className)
raise ValueError ()
rows = []
- for config in self.schema[className][0]:
+ for config in self.schema[classKey][0]:
name = config[0]
if name != "id":
typename = self.typeName(config[1])
@@ -446,7 +484,7 @@ class ManagementData:
extra = extra + "MaxLen: " + str (config[8])
rows.append ((name, typename, unit, access, extra, desc))
- for config in self.schema[className][1]:
+ for config in self.schema[classKey][1]:
name = config[0]
if name != "id":
typename = self.typeName(config[1])
@@ -455,10 +493,10 @@ class ManagementData:
rows.append ((name, typename, unit, "", "", desc))
titles = ("Element", "Type", "Unit", "Access", "Notes", "Description")
- self.disp.table ("Schema for class '%s':" % className, titles, rows)
+ self.disp.table ("Schema for class '%s.%s':" % (classKey[0], classKey[1]), titles, rows)
- for mname in self.schema[className][2]:
- (mdesc, args) = self.schema[className][2][mname]
+ for mname in self.schema[classKey][2]:
+ (mdesc, args) = self.schema[classKey][2][mname]
caption = "\nMethod '%s' %s" % (mname, self.notNone (mdesc))
rows = []
for arg in args:
@@ -485,25 +523,25 @@ class ManagementData:
self.lock.release ()
def getClassForId (self, objId):
- """ Given an object ID, return the class name for the referenced object """
- for className in self.tables:
- if objId in self.tables[className]:
- return className
+ """ Given an object ID, return the class key for the referenced object """
+ for classKey in self.tables:
+ if objId in self.tables[classKey]:
+ return classKey
return None
def callMethod (self, userOid, methodName, args):
self.lock.acquire ()
methodOk = True
try:
- className = self.getClassForId (self.rawObjId (userOid))
- if className == None:
+ classKey = self.getClassForId (self.rawObjId (userOid))
+ if classKey == None:
raise ValueError ()
- if methodName not in self.schema[className][2]:
- print "Method '%s' not valid for class '%s'" % (methodName, className)
+ if methodName not in self.schema[classKey][2]:
+ print "Method '%s' not valid for class '%s.%s'" % (methodName, classKey[0], classKey[1])
raise ValueError ()
- schemaMethod = self.schema[className][2][methodName]
+ schemaMethod = self.schema[classKey][2][methodName]
if len (args) != len (schemaMethod[1]):
print "Wrong number of method args: Need %d, Got %d" % (len (schemaMethod[1]), len (args))
raise ValueError ()
@@ -519,8 +557,8 @@ class ManagementData:
self.lock.release ()
if methodOk:
# try:
- self.broker.method (self.methodSeq, self.rawObjId (userOid), className,
- methodName, namedArgs)
+ self.mclient.callMethod (self.mch, self.methodSeq, self.rawObjId (userOid), classKey,
+ methodName, namedArgs)
# except ValueError, e:
# print "Error invoking method:", e
diff --git a/qpid/python/qpid/codec.py b/qpid/python/qpid/codec.py
index b25de11f11..1a9372455d 100644
--- a/qpid/python/qpid/codec.py
+++ b/qpid/python/qpid/codec.py
@@ -265,6 +265,38 @@ class Codec:
"""
return self.unpack("!Q")
+ def encode_float(self, o):
+ self.pack("!f", o)
+
+ def decode_float(self):
+ return self.unpack("!f")
+
+ def encode_double(self, o):
+ self.pack("!d", o)
+
+ def decode_double(self):
+ return self.unpack("!d")
+
+ def encode_bin128(self, b):
+ for idx in range (0,16):
+ self.pack("!B", ord (b[idx]))
+
+ def decode_bin128(self):
+ result = ""
+ for idx in range (0,16):
+ result = result + chr (self.unpack("!B"))
+ return result
+
+ def encode_raw(self, len, b):
+ for idx in range (0,len):
+ self.pack("!B", b[idx])
+
+ def decode_raw(self, len):
+ result = ""
+ for idx in range (0,len):
+ result = result + chr (self.unpack("!B"))
+ return result
+
def enc_str(self, fmt, s):
"""
encodes a string 's' in network byte order as per format 'fmt'
diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py
index 40de2a5298..b5d992cf5d 100644
--- a/qpid/python/qpid/management.py
+++ b/qpid/python/qpid/management.py
@@ -35,12 +35,14 @@ from threading import Lock
class SequenceManager:
+ """ Manage sequence numbers for asynchronous method calls """
def __init__ (self):
self.lock = Lock ()
self.sequence = 0
self.pending = {}
def reserve (self, data):
+ """ Reserve a unique sequence number """
self.lock.acquire ()
result = self.sequence
self.sequence = self.sequence + 1
@@ -49,6 +51,7 @@ class SequenceManager:
return result
def release (self, seq):
+ """ Release a reserved sequence number """
data = None
self.lock.acquire ()
if seq in self.pending:
@@ -57,12 +60,172 @@ class SequenceManager:
self.lock.release ()
return data
-class ManagementMetadata:
- """One instance of this class is created for each ManagedBroker. It
- is used to store metadata from the broker which is needed for the
- proper interpretation of received management content."""
+
+class managementChannel:
+ """ This class represents a connection to an AMQP broker. """
+
+ def __init__ (self, ch, topicCb, replyCb, cbContext=None):
+ """ Given a channel on an established AMQP broker connection, this method
+ opens a session and performs all of the declarations and bindings needed
+ to participate in the management protocol. """
+ response = ch.session_open (detached_lifetime=300)
+ self.topicName = "mgmt-" + base64.urlsafe_b64encode (response.session_id)
+ self.replyName = "reply-" + base64.urlsafe_b64encode (response.session_id)
+ self.qpidChannel = ch
+ self.tcb = topicCb
+ self.rcb = replyCb
+ self.context = cbContext
+
+ ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1)
+ ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1)
+
+ ch.queue_bind (exchange="qpid.management",
+ queue=self.topicName, routing_key="mgmt.#")
+ ch.queue_bind (exchange="amq.direct",
+ queue=self.replyName, routing_key=self.replyName)
+ ch.message_subscribe (queue=self.topicName, destination="tdest")
+ ch.message_subscribe (queue=self.replyName, destination="rdest")
+
+ ch.client.queue ("tdest").listen (self.topicCb)
+ ch.client.queue ("rdest").listen (self.replyCb)
+
+ ch.message_flow_mode (destination="tdest", mode=1)
+ ch.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF)
+ ch.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF)
+
+ ch.message_flow_mode (destination="rdest", mode=1)
+ ch.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
+ ch.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
+
+ def topicCb (self, msg):
+ """ Receive messages via the topic queue on this channel. """
+ self.tcb (self, msg)
+
+ def replyCb (self, msg):
+ """ Receive messages via the reply queue on this channel. """
+ self.rcb (self, msg)
+
+ def send (self, exchange, msg):
+ self.qpidChannel.message_transfer (destination=exchange, content=msg)
+
+
+class managementClient:
+ """ This class provides an API for access to management data on the AMQP
+ network. It implements the management protocol and manages the management
+ schemas as advertised by the various management agents in the network. """
+
+ #========================================================
+ # User API - interacts with the class's user
+ #========================================================
+ def __init__ (self, amqpSpec, ctrlCb, configCb, instCb, methodCb=None):
+ self.spec = amqpSpec
+ self.ctrlCb = ctrlCb
+ self.configCb = configCb
+ self.instCb = instCb
+ self.methodCb = methodCb
+ self.schemaCb = None
+ self.eventCb = None
+ self.channels = []
+ self.seqMgr = SequenceManager ()
+ self.schema = {}
+ self.packages = {}
+
+ def schemaListener (self, schemaCb):
+ """ Optionally register a callback to receive details of the schema of
+ managed objects in the network. """
+ self.schemaCb = schemaCb
+
+ def eventListener (self, eventCb):
+ """ Optionally register a callback to receive events from managed objects
+ in the network. """
+ self.eventCb = eventCb
+
+ def addChannel (self, channel):
+ """ Register a new channel. """
+ self.channels.append (channel)
+ codec = Codec (StringIO (), self.spec)
+ self.setHeader (codec, ord ('H'))
+ msg = Content (codec.stream.getvalue ())
+ msg["content_type"] = "application/octet-stream"
+ msg["routing_key"] = "agent"
+ msg["reply_to"] = self.spec.struct ("reply_to")
+ msg["reply_to"]["exchange_name"] = "amq.direct"
+ msg["reply_to"]["routing_key"] = channel.replyName
+ channel.send ("qpid.management", msg)
+
+ def removeChannel (self, channel):
+ """ Remove a previously added channel from management. """
+ self.channels.remove (channel)
+
+ def callMethod (self, channel, userSequence, objId, className, methodName, args=None):
+ """ Invoke a method on a managed object. """
+ self.method (channel, userSequence, objId, className, methodName, args)
+
+ #========================================================
+ # Channel API - interacts with registered channel objects
+ #========================================================
+ def topicCb (self, ch, msg):
+ """ Receive messages via the topic queue of a particular channel. """
+ codec = Codec (StringIO (msg.content.body), self.spec)
+ hdr = self.checkHeader (codec)
+ if hdr == None:
+ raise ValueError ("outer header invalid");
+ self.parse (ch, codec, hdr[0], hdr[1])
+ msg.complete ()
+
+ def replyCb (self, ch, msg):
+ """ Receive messages via the reply queue of a particular channel. """
+ codec = Codec (StringIO (msg.content.body), self.spec)
+ hdr = self.checkHeader (codec)
+ if hdr == None:
+ msg.complete ()
+ return
+
+ if hdr[0] == 'm':
+ self.handleMethodReply (ch, codec)
+ elif hdr[0] == 'I':
+ self.handleInit (ch, codec)
+ elif hdr[0] == 'p':
+ self.handlePackageInd (ch, codec)
+ elif hdr[0] == 'q':
+ self.handleClassInd (ch, codec)
+ else:
+ self.parse (ch, codec, hdr[0], hdr[1])
+ msg.complete ()
+
+ #========================================================
+ # Internal Functions
+ #========================================================
+ def setHeader (self, codec, opcode, cls = 0):
+ """ Compose the header of a management message. """
+ codec.encode_octet (ord ('A'))
+ codec.encode_octet (ord ('M'))
+ codec.encode_octet (ord ('0'))
+ codec.encode_octet (ord ('1'))
+ codec.encode_octet (opcode)
+ codec.encode_octet (cls)
+
+ def checkHeader (self, codec):
+ """ Check the header of a management message and extract the opcode and
+ class. """
+ octet = chr (codec.decode_octet ())
+ if octet != 'A':
+ return None
+ octet = chr (codec.decode_octet ())
+ if octet != 'M':
+ return None
+ octet = chr (codec.decode_octet ())
+ if octet != '0':
+ return None
+ octet = chr (codec.decode_octet ())
+ if octet != '1':
+ return None
+ opcode = chr (codec.decode_octet ())
+ cls = chr (codec.decode_octet ())
+ return (opcode, cls)
def encodeValue (self, codec, value, typecode):
+ """ Encode, into the codec, a value based on its typecode. """
if typecode == 1:
codec.encode_octet (int (value))
elif typecode == 2:
@@ -85,10 +248,15 @@ class ManagementMetadata:
codec.encode_longlong (long (value))
elif typecode == 11: # BOOL
codec.encode_octet (int (value))
+ elif typecode == 12: # FLOAT
+ codec.encode_float (float (value))
+ elif typecode == 13: # DOUBLE
+ codec.encode_double (double (value))
else:
raise ValueError ("Invalid type code: %d" % typecode)
def decodeValue (self, codec, typecode):
+ """ Decode, from the codec, a value based on its typecode. """
if typecode == 1:
data = codec.decode_octet ()
elif typecode == 2:
@@ -111,17 +279,119 @@ class ManagementMetadata:
data = codec.decode_longlong ()
elif typecode == 11: # BOOL
data = codec.decode_octet ()
+ elif typecode == 12: # FLOAT
+ data = codec.decode_float ()
+ elif typecode == 13: # DOUBLE
+ data = codec.decode_double ()
else:
raise ValueError ("Invalid type code: %d" % typecode)
return data
+
+ def handleMethodReply (self, ch, codec):
+ sequence = codec.decode_long ()
+ status = codec.decode_long ()
+ sText = codec.decode_shortstr ()
+
+ data = self.seqMgr.release (sequence)
+ if data == None:
+ return
+
+ (userSequence, classId, methodName) = data
+ args = {}
+
+ if status == 0:
+ schemaClass = self.schema[classId]
+ ms = schemaClass['M']
+ arglist = None
+ for mname in ms:
+ (mdesc, margs) = ms[mname]
+ if mname == methodName:
+ arglist = margs
+ if arglist == None:
+ return
+
+ for arg in arglist:
+ if arg[2].find("O") != -1:
+ args[arg[0]] = self.decodeValue (codec, arg[1])
+
+ if self.methodCb != None:
+ self.methodCb (ch.context, userSequence, status, sText, args)
+
+ def handleInit (self, ch, codec):
+ len = codec.decode_short ()
+ data = codec.decode_raw (len)
+ if self.ctrlCb != None:
+ self.ctrlCb (ch.context, len, data)
+
+ # Send a package request
+ sendCodec = Codec (StringIO (), self.spec)
+ self.setHeader (sendCodec, ord ('P'))
+ smsg = Content (sendCodec.stream.getvalue ())
+ smsg["content_type"] = "application/octet-stream"
+ smsg["routing_key"] = "agent"
+ smsg["reply_to"] = self.spec.struct ("reply_to")
+ smsg["reply_to"]["exchange_name"] = "amq.direct"
+ smsg["reply_to"]["routing_key"] = ch.replyName
+ ch.send ("qpid.management", smsg)
- def parseSchema (self, cls, codec):
+ def handlePackageInd (self, ch, codec):
+ pname = codec.decode_shortstr ()
+ if pname not in self.packages:
+ self.packages[pname] = {}
+
+ # Send a class request
+ sendCodec = Codec (StringIO (), self.spec)
+ self.setHeader (sendCodec, ord ('Q'))
+ sendCodec.encode_shortstr (pname)
+ smsg = Content (sendCodec.stream.getvalue ())
+ smsg["content_type"] = "application/octet-stream"
+ smsg["routing_key"] = "agent"
+ smsg["reply_to"] = self.spec.struct ("reply_to")
+ smsg["reply_to"]["exchange_name"] = "amq.direct"
+ smsg["reply_to"]["routing_key"] = ch.replyName
+ ch.send ("qpid.management", smsg)
+
+ def handleClassInd (self, ch, codec):
+ pname = codec.decode_shortstr ()
+ cname = codec.decode_shortstr ()
+ hash = codec.decode_bin128 ()
+ if pname not in self.packages:
+ return
+
+ if (cname, hash) not in self.packages[pname]:
+ # Send a schema request
+ sendCodec = Codec (StringIO (), self.spec)
+ self.setHeader (sendCodec, ord ('S'))
+ sendCodec.encode_shortstr (pname)
+ sendCodec.encode_shortstr (cname)
+ sendCodec.encode_bin128 (hash)
+ smsg = Content (sendCodec.stream.getvalue ())
+ smsg["content_type"] = "application/octet-stream"
+ smsg["routing_key"] = "agent"
+ smsg["reply_to"] = self.spec.struct ("reply_to")
+ smsg["reply_to"]["exchange_name"] = "amq.direct"
+ smsg["reply_to"]["routing_key"] = ch.replyName
+ ch.send ("qpid.management", smsg)
+
+ def parseSchema (self, ch, cls, codec):
+ """ Parse a received schema-description message. """
+ packageName = codec.decode_shortstr ()
className = codec.decode_shortstr ()
+ hash = codec.decode_bin128 ()
configCount = codec.decode_short ()
instCount = codec.decode_short ()
methodCount = codec.decode_short ()
eventCount = codec.decode_short ()
+ if packageName not in self.packages:
+ return
+ if (className, hash) in self.packages[packageName]:
+ return
+
+ classKey = (packageName, className, hash)
+ if classKey in self.schema:
+ return
+
configs = []
insts = []
methods = {}
@@ -213,25 +483,29 @@ class ManagementMetadata:
args.append (arg)
methods[mname] = (mdesc, args)
+ schemaClass = {}
+ schemaClass['C'] = configs
+ schemaClass['I'] = insts
+ schemaClass['M'] = methods
+ schemaClass['E'] = events
+ self.schema[classKey] = schemaClass
- self.schema[(className,'C')] = configs
- self.schema[(className,'I')] = insts
- self.schema[(className,'M')] = methods
- self.schema[(className,'E')] = events
-
- if self.broker.schema_cb != None:
- self.broker.schema_cb[1] (self.broker.schema_cb[0], className,
- configs, insts, methods, events)
+ if self.schemaCb != None:
+ self.schemaCb (ch.context, classKey, configs, insts, methods, events)
- def parseContent (self, cls, codec):
- if cls == 'C' and self.broker.config_cb == None:
+ def parseContent (self, ch, cls, codec):
+ """ Parse a received content message. """
+ if cls == 'C' and self.configCb == None:
return
- if cls == 'I' and self.broker.inst_cb == None:
+ if cls == 'I' and self.instCb == None:
return
- className = codec.decode_shortstr ()
+ packageName = codec.decode_shortstr ()
+ className = codec.decode_shortstr ()
+ hash = codec.decode_bin128 ()
+ classKey = (packageName, className, hash)
- if (className,cls) not in self.schema:
+ if classKey not in self.schema:
return
row = []
@@ -241,184 +515,49 @@ class ManagementMetadata:
timestamps.append (codec.decode_longlong ()) # Create Time
timestamps.append (codec.decode_longlong ()) # Delete Time
- for element in self.schema[(className,cls)][:]:
+ schemaClass = self.schema[classKey]
+ for element in schemaClass[cls][:]:
tc = element[1]
name = element[0]
data = self.decodeValue (codec, tc)
row.append ((name, data))
- if cls == 'C':
- self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps)
+ if cls == 'C':
+ self.configCb (ch.context, classKey, row, timestamps)
elif cls == 'I':
- self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps)
-
- def parse (self, codec, opcode, cls):
- if opcode == 'S':
- self.parseSchema (cls, codec)
+ self.instCb (ch.context, classKey, row, timestamps)
+ def parse (self, ch, codec, opcode, cls):
+ """ Parse a message received from the topic queue. """
+ if opcode == 's':
+ self.parseSchema (ch, cls, codec)
elif opcode == 'C':
- self.parseContent (cls, codec)
-
+ self.parseContent (ch, cls, codec)
else:
raise ValueError ("Unknown opcode: %c" % opcode);
- def __init__ (self, broker):
- self.broker = broker
- self.schema = {}
-
-
-class ManagedBroker:
- """An object of this class represents a connection (over AMQP) to a
- single managed broker."""
-
- mExchange = "qpid.management"
- dExchange = "amq.direct"
-
- def setHeader (self, codec, opcode, cls = 0):
- codec.encode_octet (ord ('A'))
- codec.encode_octet (ord ('M'))
- codec.encode_octet (ord ('0'))
- codec.encode_octet (ord ('1'))
- codec.encode_octet (opcode)
- codec.encode_octet (cls)
-
- def checkHeader (self, codec):
- octet = chr (codec.decode_octet ())
- if octet != 'A':
- return None
- octet = chr (codec.decode_octet ())
- if octet != 'M':
- return None
- octet = chr (codec.decode_octet ())
- if octet != '0':
- return None
- octet = chr (codec.decode_octet ())
- if octet != '1':
- return None
- opcode = chr (codec.decode_octet ())
- cls = chr (codec.decode_octet ())
- return (opcode, cls)
-
- def publish_cb (self, msg):
- codec = Codec (StringIO (msg.content.body), self.spec)
-
- hdr = self.checkHeader (codec)
- if hdr == None:
- raise ValueError ("outer header invalid");
-
- self.metadata.parse (codec, hdr[0], hdr[1])
- msg.complete ()
-
- def reply_cb (self, msg):
- codec = Codec (StringIO (msg.content.body), self.spec)
- hdr = self.checkHeader (codec)
- if hdr == None:
- msg.complete ()
- return
- if hdr[0] != 'R':
- msg.complete ()
- return
-
- sequence = codec.decode_long ()
- status = codec.decode_long ()
- sText = codec.decode_shortstr ()
-
- data = self.sequenceManager.release (sequence)
- if data == None:
- msg.complete ()
- return
-
- (userSequence, className, methodName) = data
- args = {}
-
- if status == 0:
- ms = self.metadata.schema[(className,'M')]
- arglist = None
- for mname in ms:
- (mdesc, margs) = ms[mname]
- if mname == methodName:
- arglist = margs
- if arglist == None:
- msg.complete ()
- return
-
- for arg in arglist:
- if arg[2].find("O") != -1:
- args[arg[0]] = self.metadata.decodeValue (codec, arg[1])
-
- if self.method_cb != None:
- self.method_cb[1] (self.method_cb[0], userSequence, status, sText, args)
-
- msg.complete ()
-
- def __init__ (self,
- host = "localhost",
- port = 5672,
- username = "guest",
- password = "guest",
- specfile = "/usr/share/amqp/amqp.0-10-preview.xml"):
-
- self.spec = qpid.spec.load (specfile)
- self.client = None
- self.channel = None
- self.queue = None
- self.rqueue = None
- self.qname = None
- self.rqname = None
- self.metadata = ManagementMetadata (self)
- self.sequenceManager = SequenceManager ()
- self.connected = 0
- self.lastConnectError = None
-
- # Initialize the callback records
- self.status_cb = None
- self.schema_cb = None
- self.config_cb = None
- self.inst_cb = None
- self.method_cb = None
-
- self.host = host
- self.port = port
- self.username = username
- self.password = password
-
- def statusListener (self, context, callback):
- self.status_cb = (context, callback)
-
- def schemaListener (self, context, callback):
- self.schema_cb = (context, callback)
-
- def configListener (self, context, callback):
- self.config_cb = (context, callback)
-
- def methodListener (self, context, callback):
- self.method_cb = (context, callback)
-
- def instrumentationListener (self, context, callback):
- self.inst_cb = (context, callback)
-
- def method (self, userSequence, objId, className,
- methodName, args=None, packageName="qpid"):
- codec = Codec (StringIO (), self.spec);
- sequence = self.sequenceManager.reserve ((userSequence, className, methodName))
+ def method (self, channel, userSequence, objId, classId, methodName, args):
+ """ Invoke a method on an object """
+ codec = Codec (StringIO (), self.spec)
+ sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
self.setHeader (codec, ord ('M'))
codec.encode_long (sequence) # Method sequence id
codec.encode_longlong (objId) # ID of object
- #codec.encode_shortstr (self.rqname) # name of reply queue
# Encode args according to schema
- if (className,'M') not in self.metadata.schema:
- self.sequenceManager.release (sequence)
- raise ValueError ("Unknown class name: %s" % className)
+ if classId not in self.schema:
+ self.seqMgr.release (sequence)
+ raise ValueError ("Unknown class name: %s" % classId)
- ms = self.metadata.schema[(className,'M')]
- arglist = None
+ schemaClass = self.schema[classId]
+ ms = schemaClass['M']
+ arglist = None
for mname in ms:
(mdesc, margs) = ms[mname]
if mname == methodName:
arglist = margs
if arglist == None:
- self.sequenceManager.release (sequence)
+ self.seqMgr.release (sequence)
raise ValueError ("Unknown method name: %s" % methodName)
for arg in arglist:
@@ -427,65 +566,17 @@ class ManagedBroker:
if arg[0] in args:
value = args[arg[0]]
if value == None:
- self.sequenceManager.release (sequence)
+ self.seqMgr.release (sequence)
raise ValueError ("Missing non-defaulted argument: %s" % arg[0])
- self.metadata.encodeValue (codec, value, arg[1])
+ self.encodeValue (codec, value, arg[1])
+ packageName = classId[0]
+ className = classId[1]
msg = Content (codec.stream.getvalue ())
msg["content_type"] = "application/octet-stream"
- msg["routing_key"] = "method." + packageName + "." + className + "." + methodName
+ msg["routing_key"] = "agent.method." + packageName + "." + \
+ className + "." + methodName
msg["reply_to"] = self.spec.struct ("reply_to")
msg["reply_to"]["exchange_name"] = "amq.direct"
- msg["reply_to"]["routing_key"] = self.rqname
- self.channel.message_transfer (destination="qpid.management", content=msg)
-
- def isConnected (self):
- return connected
-
- def start (self):
- print "Connecting to broker %s:%d" % (self.host, self.port)
-
- try:
- self.client = Client (self.host, self.port, self.spec)
- self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
- self.channel = self.client.channel (1)
- response = self.channel.session_open (detached_lifetime=300)
- self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id)
- self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id)
-
- self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1)
- self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1)
-
- self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname,
- routing_key="mgmt.#")
- self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname,
- routing_key=self.rqname)
-
- self.channel.message_subscribe (queue=self.qname, destination="mdest")
- self.channel.message_subscribe (queue=self.rqname, destination="rdest")
-
- self.queue = self.client.queue ("mdest")
- self.queue.listen (self.publish_cb)
-
- self.channel.message_flow_mode (destination="mdest", mode=1)
- self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF)
- self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF)
-
- self.rqueue = self.client.queue ("rdest")
- self.rqueue.listen (self.reply_cb)
-
- self.channel.message_flow_mode (destination="rdest", mode=1)
- self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
- self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
-
- self.connected = 1
-
- except socket.error, e:
- print "Socket Error:", e[1]
- self.lastConnectError = e
- raise
- except:
- raise
-
- def stop (self):
- pass
+ msg["reply_to"]["routing_key"] = channel.replyName
+ channel.send ("qpid.management", msg)
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index eab1033805..33c41fb884 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -47,7 +47,11 @@
<class name="system">
<configElement name="sysId" index="y" type="sstr" access="RC"/>
- <!-- RT config/instrumentation TBD -->
+ <instElement name="osName" type="sstr" desc="Operating System Name"/>
+ <instElement name="nodeName" type="sstr" desc="Node Name"/>
+ <instElement name="release" type="sstr"/>
+ <instElement name="version" type="sstr"/>
+ <instElement name="machine" type="sstr"/>
</class>
@@ -57,20 +61,18 @@
===============================================================
-->
<class name="broker">
- <configElement name="systemRef" type="objId" access="RC" index="y" desc="System ID"/>
+ <configElement name="systemRef" type="objId" access="RC" index="y" desc="System ID" parentRef="y"/>
<configElement name="port" type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/>
<configElement name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/>
<configElement name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/>
<configElement name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/>
<configElement name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/>
- <configElement name="storeLib" type="sstr" access="RO" desc="Name of persistent storage library"/>
- <configElement name="asyncStore" type="bool" access="RO" desc="Use async persistent store"/>
<configElement name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/>
- <configElement name="initialDiskPageSize" type="uint32" access="RO" desc="Number of disk pages allocated for storage"/>
- <configElement name="initialPagesPerQueue" type="uint32" access="RO" desc="Number of disk pages allocated per queue"/>
<configElement name="clusterName" type="sstr" access="RO"
- desc="Name of cluster this server is a member of, zero-length for standalone server"/>
+ desc="Name of cluster this server is a member of"/>
<configElement name="version" type="sstr" access="RO" desc="Running software version"/>
+ <configElement name="dataDirEnabled" type="bool" access="RO" desc="Persistent configuration storage enabled"/>
+ <configElement name="dataDir" type="sstr" access="RO" desc="Persistent configuration storage location"/>
<method name="joinCluster">
<arg name="clusterName" dir="I" type="sstr"/>
@@ -137,9 +139,7 @@
<instElement name="consumers" type="hilo32" unit="consumer" desc="Current consumers on queue"/>
<instElement name="bindings" type="hilo32" unit="binding" desc="Current bindings"/>
<instElement name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/>
- <instElement name="messageLatencyMin" type="uint64" unit="nanosecond" desc="Minimum broker latency through this queue"/>
- <instElement name="messageLatencyMax" type="uint64" unit="nanosecond" desc="Maximum broker latency through this queue"/>
- <instElement name="messageLatencyAvg" type="uint64" unit="nanosecond" desc="Average broker latency through this queue"/>
+ <instElement name="messageLatency" type="mmaTime" unit="nanosecond" desc="Broker latency through this queue"/>
<method name="purge" desc="Discard all messages on queue"/>
</class>
@@ -203,6 +203,9 @@
===============================================================
-->
<class name="link">
+
+ This class represents an inter-broker connection.
+
<configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/>
<configElement name="address" type="sstr" access="RC" index="y"/>
diff --git a/qpid/specs/management-types.xml b/qpid/specs/management-types.xml
index 842a18cb30..6c86be3db1 100644
--- a/qpid/specs/management-types.xml
+++ b/qpid/specs/management-types.xml
@@ -29,6 +29,8 @@
<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString (#)" decode="@.getLongString (#)" accessor="direct" init='""'/>
<type name="absTime" base="ABSTIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
<type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
+<type name="float" base="FLOAT" cpp="float" encode="@.putFloat (#)" decode="# = @.getFloat ()" accessor="direct" init="0."/>
+<type name="double" base="DOUBLE" cpp="double" encode="@.putDouble (#)" decode="# = @.getDouble ()" accessor="direct" init="0."/>
<type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" style="wm" accessor="counter" init="0"/>
<type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" style="wm" accessor="counter" init="0"/>
@@ -41,8 +43,9 @@
<type name="count64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="counter" init="0"/>
<!-- Min/Max/Average statistics -->
-<type name="mma32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" style="mma" accessor="direct" init="0"/>
-<type name="mma64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/>
+<type name="mma32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" style="mma" accessor="direct" init="0"/>
+<type name="mma64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/>
+<type name="mmaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/>
<!-- Some Proposed Syntax for User-Defined Types:
<enum name="enumeratedType" base="U8">