summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-06-30 19:00:49 +0000
committerTed Ross <tross@apache.org>2008-06-30 19:00:49 +0000
commitd2051d8e6910c4cbcd9c2ce2ef01089360f83e43 (patch)
tree14142fcee4c5aa5decfaf138f2d04e8d6f1b9651
parent061d6a61e73c8d4e43a711e526d6586db9f54c01 (diff)
downloadqpid-python-d2051d8e6910c4cbcd9c2ce2ef01089360f83e43.tar.gz
QPID-1160 - Per-thread counters in management API to avoid locking
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@672864 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/managementgen/generate.py62
-rwxr-xr-xqpid/cpp/managementgen/schema.py251
-rw-r--r--qpid/cpp/managementgen/templates/Class.cpp56
-rw-r--r--qpid/cpp/managementgen/templates/Class.h33
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp21
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/System.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Vhost.cpp2
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp0
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h2
-rw-r--r--qpid/cpp/src/qpid/management/ManagementBroker.cpp10
-rw-r--r--qpid/cpp/src/qpid/management/ManagementBroker.h6
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.cpp14
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.h29
-rw-r--r--qpid/specs/management-schema.xml29
-rw-r--r--qpid/specs/management-types.xml25
21 files changed, 394 insertions, 188 deletions
diff --git a/qpid/cpp/managementgen/generate.py b/qpid/cpp/managementgen/generate.py
index da78d6c7e9..197deec4f1 100755
--- a/qpid/cpp/managementgen/generate.py
+++ b/qpid/cpp/managementgen/generate.py
@@ -30,39 +30,62 @@ class Template:
Expandable File Template - This class is instantiated each time a
template is to be expanded. It is instantiated with the "filename"
which is the full path to the template file and the "handler" which
- is an object that is responsible for storing variables (setVariable)
- and expanding tags (substHandler).
+ is an object that is responsible for storing variables (setVariable),
+ checking conditions (testCondition), and expanding tags (substHandler).
"""
def __init__ (self, filename, handler):
self.filename = filename
self.handler = handler
self.handler.initExpansion ()
+ self.writing = True
def expandLine (self, line, stream, object):
cursor = 0
while 1:
sub = line.find ("/*MGEN:", cursor)
if sub == -1:
- stream.write (line[cursor:len (line)])
+ if self.writing:
+ stream.write (line[cursor:len (line)])
return
subend = line.find("*/", sub)
- stream.write (line[cursor:sub])
+ if self.writing:
+ stream.write (line[cursor:sub])
cursor = subend + 2
- tag = line[sub:subend]
- equalPos = tag.find ("=")
- if equalPos == -1:
- dotPos = tag.find (".")
+ tag = line[sub:subend]
+
+ if tag[7:10] == "IF(":
+ close = tag.find(")")
+ if close == -1:
+ raise ValueError ("Missing ')' on condition")
+ cond = tag[10:close]
+ dotPos = cond.find (".")
if dotPos == -1:
- raise ValueError ("Invalid tag: %s" % tag)
- tagObject = tag[7:dotPos]
- tagName = tag[dotPos + 1:len (tag)]
- self.handler.substHandler (object, stream, tagObject, tagName)
+ raise ValueError ("Invalid condition tag: %s" % cond)
+ tagObject = cond[0:dotPos]
+ tagName = cond[dotPos + 1 : len(cond)]
+ if not self.handler.testCondition(object, tagObject, tagName):
+ self.writing = False
+
+ elif tag[7:12] == "ENDIF":
+ self.writing = True
+
else:
- tagKey = tag[7:equalPos]
- tagVal = tag[equalPos + 1:len (tag)]
- self.handler.setVariable (tagKey, tagVal)
+ equalPos = tag.find ("=")
+ if equalPos == -1:
+ dotPos = tag.find (".")
+ if dotPos == -1:
+ raise ValueError ("Invalid tag: %s" % tag)
+ tagObject = tag[7:dotPos]
+ tagName = tag[dotPos + 1:len (tag)]
+ if self.writing:
+ self.handler.substHandler (object, stream, tagObject, tagName)
+ else:
+ tagKey = tag[7:equalPos]
+ tagVal = tag[equalPos + 1:len (tag)]
+ if self.writing:
+ self.handler.setVariable (tagKey, tagVal)
def expand (self, object):
fd = open (self.filename)
@@ -224,6 +247,15 @@ class Generator:
call = obj + ".gen" + tag + "(stream, self.variables)"
eval (call)
+ def testCondition (self, object, tagObject, tag):
+ if tagObject == "Root":
+ obj = "self"
+ else:
+ obj = "object" # MUST be the same as the 2nd formal parameter
+
+ call = obj + ".test" + tag + "(self.variables)"
+ return eval (call)
+
def setVariable (self, key, value):
self.variables[key] = value
diff --git a/qpid/cpp/managementgen/schema.py b/qpid/cpp/managementgen/schema.py
index 4e1f898274..4f5dc216ab 100755
--- a/qpid/cpp/managementgen/schema.py
+++ b/qpid/cpp/managementgen/schema.py
@@ -26,14 +26,15 @@ import md5
#=====================================================================================
class SchemaType:
def __init__ (self, node):
- self.name = None
- self.base = None
- self.cpp = None
- self.encode = None
- self.decode = None
- self.style = "normal"
- self.accessor = None
- self.init = "0"
+ self.name = None
+ self.base = None
+ self.cpp = None
+ self.encode = None
+ self.decode = None
+ self.style = "normal"
+ self.accessor = None
+ self.init = "0"
+ self.perThread = False
attrs = node.attributes
for idx in range (attrs.length):
@@ -63,6 +64,11 @@ class SchemaType:
elif key == 'init':
self.init = val
+ elif key == 'perThread':
+ if val != 'y':
+ raise ValueError ("Expected 'y' in perThread attribute")
+ self.perThread = True
+
else:
raise ValueError ("Unknown attribute in type '%s'" % key)
@@ -74,43 +80,38 @@ class SchemaType:
return self.name
def genAccessor (self, stream, varName, changeFlag = None):
+ if self.perThread:
+ prefix = "getThreadStats()->"
+ if self.style == "wm":
+ raise ValueError ("'wm' style types can't be per-thread")
+ else:
+ prefix = ""
if self.accessor == "direct":
stream.write (" inline void set_" + varName + " (" + self.cpp + " val){\n");
- stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n")
+ if not self.perThread:
+ stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n")
if self.style != "mma":
- stream.write (" " + varName + " = val;\n");
+ stream.write (" " + prefix + varName + " = val;\n");
if self.style == "wm":
stream.write (" if (" + varName + "Low > val)\n")
stream.write (" " + varName + "Low = val;\n")
stream.write (" if (" + varName + "High < val)\n")
stream.write (" " + varName + "High = val;\n")
if self.style == "mma":
- stream.write (" " + varName + "Count++;\n")
- stream.write (" " + varName + "Total += val;\n")
- stream.write (" if (" + varName + "Min > val)\n")
- stream.write (" " + varName + "Min = val;\n")
- stream.write (" if (" + varName + "Max < val)\n")
- stream.write (" " + varName + "Max = val;\n")
- if changeFlag != None:
- stream.write (" " + changeFlag + " = true;\n")
- stream.write (" }\n");
- elif self.accessor == "counterByOne":
- stream.write (" inline void inc_" + varName + " (){\n");
- stream.write (" ++" + varName + ";\n")
- if changeFlag != None:
- stream.write (" " + changeFlag + " = true;\n")
- stream.write (" }\n");
- stream.write (" inline void dec_" + varName + " (){\n");
- stream.write (" --" + varName + ";\n")
+ stream.write (" " + prefix + varName + "Count++;\n")
+ stream.write (" " + prefix + varName + "Total += val;\n")
+ stream.write (" if (" + prefix + varName + "Min > val)\n")
+ stream.write (" " + prefix + varName + "Min = val;\n")
+ stream.write (" if (" + prefix + varName + "Max < val)\n")
+ stream.write (" " + prefix + varName + "Max = val;\n")
if changeFlag != None:
stream.write (" " + changeFlag + " = true;\n")
stream.write (" }\n");
elif self.accessor == "counter":
stream.write (" inline void inc_" + varName + " (" + self.cpp + " by = 1){\n");
- stream.write (" if (by == 1)\n")
- stream.write (" ++" + varName + ";\n")
- stream.write (" else\n")
- stream.write (" " + varName + " += by;\n")
+ if not self.perThread:
+ stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n")
+ stream.write (" " + prefix + varName + " += by;\n")
if self.style == "wm":
stream.write (" if (" + varName + "High < " + varName + ")\n")
stream.write (" " + varName + "High = " + varName + ";\n")
@@ -118,27 +119,15 @@ class SchemaType:
stream.write (" " + changeFlag + " = true;\n")
stream.write (" }\n");
stream.write (" inline void dec_" + varName + " (" + self.cpp + " by = 1){\n");
- stream.write (" if (by == 1)\n")
- stream.write (" " + varName + "--;\n")
- stream.write (" else\n")
- stream.write (" " + varName + " -= by;\n")
+ if not self.perThread:
+ stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n")
+ stream.write (" " + prefix + varName + " -= by;\n")
if self.style == "wm":
stream.write (" if (" + varName + "Low > " + varName + ")\n")
stream.write (" " + varName + "Low = " + varName + ";\n")
if changeFlag != None:
stream.write (" " + changeFlag + " = true;\n")
stream.write (" }\n");
- stream.write (" inline void set_" + varName + " (" + self.cpp + " val){\n");
- stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n")
- stream.write (" " + varName + " = val;\n");
- if self.style == "wm":
- stream.write (" if (" + varName + "Low > val)\n")
- stream.write (" " + varName + "Low = val;\n")
- stream.write (" if (" + varName + "High < val)\n")
- stream.write (" " + varName + "High = val;\n")
- if changeFlag != None:
- stream.write (" " + changeFlag + " = true;\n")
- stream.write (" }\n");
def genHiLoStatResets (self, stream, varName):
if self.style == "wm":
@@ -150,6 +139,13 @@ class SchemaType:
stream.write (" " + varName + "Min = -1;\n")
stream.write (" " + varName + "Max = 0;\n")
+ def genPerThreadHiLoStatResets (self, stream, varName):
+ if self.style == "mma":
+ stream.write (" threadStats->" + varName + "Count = 0;\n")
+ stream.write (" threadStats->" + varName + "Total = 0;\n")
+ stream.write (" threadStats->" + varName + "Min = -1;\n")
+ stream.write (" threadStats->" + varName + "Max = 0;\n")
+
def genWrite (self, stream, varName):
if self.style != "mma":
stream.write (" " + self.encode.replace ("@", "buf").replace ("#", varName) + ";\n")
@@ -235,6 +231,8 @@ class SchemaConfig:
elif key == 'type':
self.type = Type (val, typespec)
+ if self.type.type.accessor != 'direct':
+ raise ValueError ("Class properties must have a type with a direct accessor")
elif key == 'references':
self.ref = val
@@ -288,8 +286,8 @@ class SchemaConfig:
return 1
return 0
- def genDeclaration (self, stream):
- stream.write (" " + self.type.type.cpp + " " + self.name + ";\n")
+ def genDeclaration (self, stream, prefix=" "):
+ stream.write (prefix + self.type.type.cpp + " " + self.name + ";\n")
def genFormalParam (self, stream):
stream.write (self.type.type.cpp + " _" + self.name)
@@ -360,17 +358,17 @@ class SchemaInst:
def getName (self):
return self.name
- def genDeclaration (self, stream):
+ def genDeclaration (self, stream, prefix=" "):
if self.type.type.style != "mma":
- stream.write (" " + self.type.type.cpp + " " + self.name + ";\n")
+ stream.write (prefix + self.type.type.cpp + " " + self.name + ";\n")
if self.type.type.style == 'wm':
- stream.write (" " + self.type.type.cpp + " " + self.name + "High;\n")
- stream.write (" " + self.type.type.cpp + " " + self.name + "Low;\n")
+ stream.write (prefix + self.type.type.cpp + " " + self.name + "High;\n")
+ stream.write (prefix + self.type.type.cpp + " " + self.name + "Low;\n")
if self.type.type.style == "mma":
- stream.write (" " + self.type.type.cpp + " " + self.name + "Count;\n")
- stream.write (" uint64_t " + self.name + "Total;\n")
- stream.write (" " + self.type.type.cpp + " " + self.name + "Min;\n")
- stream.write (" " + self.type.type.cpp + " " + self.name + "Max;\n")
+ stream.write (prefix + self.type.type.cpp + " " + self.name + "Count;\n")
+ stream.write (prefix + "uint64_t " + self.name + "Total;\n")
+ stream.write (prefix + self.type.type.cpp + " " + self.name + "Min;\n")
+ stream.write (prefix + self.type.type.cpp + " " + self.name + "Max;\n")
def genAccessor (self, stream):
self.type.type.genAccessor (stream, self.name, "instChanged")
@@ -378,6 +376,9 @@ class SchemaInst:
def genHiLoStatResets (self, stream):
self.type.type.genHiLoStatResets (stream, self.name)
+ def genPerThreadHiLoStatResets (self, stream):
+ self.type.type.genPerThreadHiLoStatResets (stream, self.name)
+
def genSchemaText (self, stream, name, desc):
stream.write (" ft = FieldTable ();\n")
stream.write (" ft.setString (NAME, \"" + name + "\");\n")
@@ -416,26 +417,51 @@ class SchemaInst:
def genAssign (self, stream):
if self.assign != None:
- stream.write (" " + self.name + " = (" + self.type.type.cpp + ") (" + self.assign + ");\n")
+ if self.type.type.perThread:
+ prefix = " threadStats->"
+ else:
+ prefix = ""
+ stream.write (" " + prefix + self.name + " = (" + self.type.type.cpp +
+ ") (" + self.assign + ");\n")
def genWrite (self, stream):
- self.type.type.genWrite (stream, self.name)
+ if self.type.type.perThread:
+ self.type.type.genWrite (stream, "totals." + self.name)
+ else:
+ self.type.type.genWrite (stream, self.name)
- def genInitialize (self, stream):
+ def genInitialize (self, stream, prefix="", indent=" "):
val = self.type.type.init
- if self.type.type.accessor == "counterByOne":
- return
if self.type.type.style != "mma":
- stream.write (" " + self.name + " = " + val + ";\n")
+ stream.write (indent + prefix + self.name + " = " + val + ";\n")
if self.type.type.style == "wm":
- stream.write (" " + self.name + "High = " + val + ";\n")
- stream.write (" " + self.name + "Low = " + val + ";\n")
+ stream.write (indent + prefix + self.name + "High = " + val + ";\n")
+ stream.write (indent + prefix + self.name + "Low = " + val + ";\n")
if self.type.type.style == "mma":
- stream.write (" " + self.name + "Count = 0;\n")
- stream.write (" " + self.name + "Min = -1;\n")
- stream.write (" " + self.name + "Max = 0;\n")
- stream.write (" " + self.name + "Total = 0;\n")
+ stream.write (indent + prefix + self.name + "Count = 0;\n")
+ stream.write (indent + prefix + self.name + "Min = -1;\n")
+ stream.write (indent + prefix + self.name + "Max = 0;\n")
+ stream.write (indent + prefix + self.name + "Total = 0;\n")
+ def genInitializeTotalPerThreadStats (self, stream):
+ if self.type.type.style == "mma":
+ stream.write (" totals->" + self.name + "Count = 0;\n")
+ stream.write (" totals->" + self.name + "Min = -1;\n")
+ stream.write (" totals->" + self.name + "Max = 0;\n")
+ stream.write (" totals->" + self.name + "Total = 0;\n")
+ else:
+ stream.write (" totals->" + self.name + " = 0;\n")
+
+ def genAggregatePerThreadStats (self, stream):
+ if self.type.type.style == "mma":
+ stream.write (" totals->%sCount += threadStats->%sCount;\n" % (self.name, self.name))
+ stream.write (" if (totals->%sMin > threadStats->%sMin)\n" % (self.name, self.name))
+ stream.write (" totals->%sMin = threadStats->%sMin;\n" % (self.name, self.name))
+ stream.write (" if (totals->%sMax < threadStats->%sMax)\n" % (self.name, self.name))
+ stream.write (" totals->%sMax = threadStats->%sMax;\n" % (self.name, self.name))
+ stream.write (" totals->%sTotal += threadStats->%sTotal;\n" % (self.name, self.name))
+ else:
+ stream.write (" totals->%s += threadStats->%s;\n" % (self.name, self.name))
#=====================================================================================
#
@@ -664,6 +690,26 @@ class SchemaClass:
else:
raise ValueError ("Unknown class tag '%s'" % child.nodeName)
+ # Adjust the 'assign' attributes for each statistic
+ for stat in self.statistics:
+ if stat.assign != None and stat.type.type.perThread:
+ stat.assign = self.adjust (stat.assign, self.statistics)
+
+ def adjust (self, text, statistics):
+ result = text
+ start = 0
+ while True:
+ next = None
+ for stat in statistics:
+ pos = result.find (stat.name, start)
+ if pos != -1 and (next == None or pos < next[0]):
+ next = (pos, stat.name)
+ if next == None:
+ return result
+ pos = next[0]
+ result = result[0:pos] + "threadStats->" + result[pos:]
+ start = pos + 9 + len(next[1])
+
def hash (self, node):
attrs = node.attributes
self.md5Sum.update (node.nodeName)
@@ -711,12 +757,34 @@ class SchemaClass:
# Code Generation Functions. The names of these functions (minus the leading "gen")
# match the substitution keywords in the template files.
#===================================================================================
+ def testExistPerThreadStats (self, variables):
+ for inst in self.statistics:
+ if inst.type.type.perThread:
+ return True
+ return False
+
+ def testExistPerThreadAssign (self, variables):
+ for inst in self.statistics:
+ if inst.type.type.perThread and inst.assign != None:
+ return True
+ return False
+
+ def testExistPerThreadResets (self, variables):
+ for inst in self.statistics:
+ if inst.type.type.perThread and inst.type.type.style == "mma":
+ return True
+ return False
+
+ def testNoStatistics (self, variables):
+ return len (self.statistics) == 0
+
def genAccessorMethods (self, stream, variables):
for config in self.properties:
if config.access != "RC":
config.genAccessor (stream)
for inst in self.statistics:
- inst.genAccessor (stream)
+ if inst.assign == None:
+ inst.genAccessor (stream)
def genConfigCount (self, stream, variables):
stream.write ("%d" % len (self.properties))
@@ -767,16 +835,33 @@ class SchemaClass:
def genHiLoStatResets (self, stream, variables):
for inst in self.statistics:
- inst.genHiLoStatResets (stream)
+ if not inst.type.type.perThread:
+ inst.genHiLoStatResets (stream)
+
+ def genPerThreadHiLoStatResets (self, stream, variables):
+ for inst in self.statistics:
+ if inst.type.type.perThread:
+ inst.genPerThreadHiLoStatResets (stream)
def genInitializeElements (self, stream, variables):
for inst in self.statistics:
- inst.genInitialize (stream)
+ if not inst.type.type.perThread:
+ inst.genInitialize (stream)
+
+ def genInitializePerThreadElements (self, stream, variables):
+ for inst in self.statistics:
+ if inst.type.type.perThread:
+ inst.genInitialize (stream, "threadStats->", " ")
+
+ def genInitializeTotalPerThreadStats (self, stream, variables):
+ for inst in self.statistics:
+ if inst.type.type.perThread:
+ inst.genInitializeTotalPerThreadStats (stream)
- def genInstChangedStub (self, stream, variables):
- if len (self.statistics) == 0:
- stream.write (" // Stub for getInstChanged. There are no inst elements\n")
- stream.write (" bool getInstChanged (void) { return false; }\n")
+ def genAggregatePerThreadStats (self, stream, variables):
+ for inst in self.statistics:
+ if inst.type.type.perThread:
+ inst.genAggregatePerThreadStats (stream)
def genInstCount (self, stream, variables):
count = 0
@@ -790,7 +875,13 @@ class SchemaClass:
def genInstDeclarations (self, stream, variables):
for element in self.statistics:
- element.genDeclaration (stream)
+ if not element.type.type.perThread:
+ element.genDeclaration (stream)
+
+ def genPerThreadDeclarations (self, stream, variables):
+ for element in self.statistics:
+ if element.type.type.perThread:
+ element.genDeclaration (stream, " ")
def genInstElementSchema (self, stream, variables):
for inst in self.statistics:
@@ -884,7 +975,13 @@ class SchemaClass:
def genAssign (self, stream, variables):
for inst in self.statistics:
- inst.genAssign (stream)
+ if not inst.type.type.perThread:
+ inst.genAssign (stream)
+
+ def genPerThreadAssign (self, stream, variables):
+ for inst in self.statistics:
+ if inst.type.type.perThread:
+ inst.genAssign (stream)
def genWriteConfig (self, stream, variables):
for config in self.properties:
diff --git a/qpid/cpp/managementgen/templates/Class.cpp b/qpid/cpp/managementgen/templates/Class.cpp
index 0fbb78b7f1..1ea1da5090 100644
--- a/qpid/cpp/managementgen/templates/Class.cpp
+++ b/qpid/cpp/managementgen/templates/Class.cpp
@@ -23,6 +23,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
+#include "qpid/management/ManagementAgent.h"
#include "/*MGEN:Class.NameCap*/.h"
/*MGEN:Class.MethodArgIncludes*/
@@ -36,15 +37,28 @@ string /*MGEN:Class.NameCap*/::className = string ("/*MGEN:Class.NameLower*/
uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] =
{/*MGEN:Class.SchemaMD5*/};
-/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) :
- ManagementObject(_core)
- /*MGEN:Class.ConstructorInits*/
+/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (ManagementAgent* _agent, Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) :
+ ManagementObject(_agent, _core)/*MGEN:Class.ConstructorInits*/
{
/*MGEN:Class.ParentRefAssignment*/
/*MGEN:Class.InitializeElements*/
+/*MGEN:IF(Class.ExistPerThreadStats)*/
+ maxThreads = agent->getMaxThreads();
+ perThreadStatsArray = new struct PerThreadStats*[maxThreads];
+ for (int idx = 0; idx < maxThreads; idx++)
+ perThreadStatsArray[idx] = 0;
+/*MGEN:ENDIF*/
}
-/*MGEN:Class.NameCap*/::~/*MGEN:Class.NameCap*/ () {}
+/*MGEN:Class.NameCap*/::~/*MGEN:Class.NameCap*/ ()
+{
+/*MGEN:IF(Class.ExistPerThreadStats)*/
+ for (int idx = 0; idx < maxThreads; idx++)
+ if (perThreadStatsArray[idx] != 0)
+ delete perThreadStatsArray[idx];
+ delete perThreadStatsArray;
+/*MGEN:ENDIF*/
+}
namespace {
const string NAME("name");
@@ -85,6 +99,19 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
/*MGEN:Class.EventSchema*/
}
+/*MGEN:IF(Class.ExistPerThreadStats)*/
+void /*MGEN:Class.NameCap*/::aggregatePerThreadStats(struct PerThreadStats* totals)
+{
+/*MGEN:Class.InitializeTotalPerThreadStats*/
+ for (int idx = 0; idx < maxThreads; idx++) {
+ struct PerThreadStats* threadStats = perThreadStatsArray[idx];
+ if (threadStats != 0) {
+/*MGEN:Class.AggregatePerThreadStats*/
+ }
+ }
+}
+/*MGEN:ENDIF*/
+
void /*MGEN:Class.NameCap*/::writeProperties (Buffer& buf)
{
sys::Mutex::ScopedLock mutex(accessLock);
@@ -98,14 +125,33 @@ void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders)
{
sys::Mutex::ScopedLock mutex(accessLock);
instChanged = false;
+/*MGEN:IF(Class.ExistPerThreadAssign)*/
+ for (int idx = 0; idx < maxThreads; idx++) {
+ struct PerThreadStats* threadStats = perThreadStatsArray[idx];
+ if (threadStats != 0) {
+/*MGEN:Class.PerThreadAssign*/
+ }
+ }
+/*MGEN:ENDIF*/
+/*MGEN:IF(Class.ExistPerThreadStats)*/
+ struct PerThreadStats totals;
+ aggregatePerThreadStats(&totals);
+/*MGEN:ENDIF*/
/*MGEN:Class.Assign*/
-
if (!skipHeaders)
writeTimestamps (buf);
/*MGEN:Class.WriteInst*/
// Maintenance of hi-lo statistics
/*MGEN:Class.HiLoStatResets*/
+/*MGEN:IF(Class.ExistPerThreadResets)*/
+ for (int idx = 0; idx < maxThreads; idx++) {
+ struct PerThreadStats* threadStats = perThreadStatsArray[idx];
+ if (threadStats != 0) {
+/*MGEN:Class.PerThreadHiLoStatResets*/
+ }
+ }
+/*MGEN:ENDIF*/
}
void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/)
diff --git a/qpid/cpp/managementgen/templates/Class.h b/qpid/cpp/managementgen/templates/Class.h
index aa01f8f105..557c7a45d5 100644
--- a/qpid/cpp/managementgen/templates/Class.h
+++ b/qpid/cpp/managementgen/templates/Class.h
@@ -26,7 +26,6 @@
#include "qpid/management/ManagementObject.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
-#include "qpid/sys/AtomicCount.h"
namespace qpid {
namespace management {
@@ -43,6 +42,27 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject
/*MGEN:Class.ConfigDeclarations*/
// Statistics
/*MGEN:Class.InstDeclarations*/
+/*MGEN:IF(Class.ExistPerThreadStats)*/
+ // Per-Thread Statistics
+ struct PerThreadStats {
+/*MGEN:Class.PerThreadDeclarations*/
+ };
+
+ struct PerThreadStats** perThreadStatsArray;
+
+ inline struct PerThreadStats* getThreadStats() {
+ int index = getThreadIndex();
+ struct PerThreadStats* threadStats = perThreadStatsArray[index];
+ if (threadStats == 0) {
+ threadStats = new(PerThreadStats);
+ perThreadStatsArray[index] = threadStats;
+/*MGEN:Class.InitializePerThreadElements*/
+ }
+ return threadStats;
+ }
+
+ void aggregatePerThreadStats(struct PerThreadStats*);
+/*MGEN:ENDIF*/
// Private Methods
static void writeSchema (qpid::framing::Buffer& buf);
void writeProperties (qpid::framing::Buffer& buf);
@@ -51,15 +71,18 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject
void doMethod (std::string methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf);
- writeSchemaCall_t getWriteSchemaCall (void) { return writeSchema; }
-
-/*MGEN:Class.InstChangedStub*/
+ writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; }
+/*MGEN:IF(Class.NoStatistics)*/
+ // Stub for getInstChanged. There are no statistics in this class.
+ bool getInstChanged (void) { return false; }
+/*MGEN:ENDIF*/
public:
friend class Package/*MGEN:Class.NamePackageCap*/;
typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr;
- /*MGEN:Class.NameCap*/ (Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/);
+ /*MGEN:Class.NameCap*/ (ManagementAgent* agent,
+ Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/);
~/*MGEN:Class.NameCap*/ (void);
/*MGEN:Class.SetGeneralReferenceDeclaration*/
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 18b2c52dad..9274de0555 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -38,13 +38,17 @@ namespace broker {
Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
const management::ArgsLinkBridge& _args) :
link(_link), id(_id), args(_args),
- mgmtObject(new management::Bridge(this, link, id, args.i_durable, args.i_src, args.i_dest,
- args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
- args.i_tag, args.i_excludes)),
listener(l), name(Uuid(true).str()), persistenceId(0)
{
- if (!args.i_durable)
- management::ManagementAgent::getAgent()->addObject(mgmtObject);
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent();
+ if (agent.get() != 0) {
+ mgmtObject = management::Bridge::shared_ptr
+ (new management::Bridge(agent.get(), this, link, id, args.i_durable, args.i_src, args.i_dest,
+ args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
+ args.i_tag, args.i_excludes));
+ if (!args.i_durable)
+ agent->addObject(mgmtObject);
+ }
}
Bridge::~Bridge()
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index a3dd93899a..0b7886b3ba 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -135,7 +135,7 @@ Broker::Broker(const Broker::Options& conf) :
if(conf.enableMgmt){
QPID_LOG(info, "Management enabled");
ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
- conf.mgmtPubInterval, this);
+ conf.mgmtPubInterval, this, conf.workerThreads + 3);
managementAgent = management::ManagementAgent::getAgent ();
((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval);
qpid::management::PackageQpid packageInitializer (managementAgent);
@@ -143,7 +143,7 @@ Broker::Broker(const Broker::Options& conf) :
System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ());
systemObject = System::shared_ptr (system);
- mgmtObject = management::Broker::shared_ptr (new management::Broker (this, system, conf.port));
+ mgmtObject = management::Broker::shared_ptr (new management::Broker (managementAgent.get(), this, system, conf.port));
mgmtObject->set_workerThreads (conf.workerThreads);
mgmtObject->set_maxConns (conf.maxConnections);
mgmtObject->set_connBacklog (conf.connectionBacklog);
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index b6f6b9cee9..9e763f6775 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -65,7 +65,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent();
if (agent.get() != 0)
- mgmtObject = management::Connection::shared_ptr(new management::Connection(this, parent, mgmtId, !isLink));
+ mgmtObject = management::Connection::shared_ptr
+ (new management::Connection(agent.get(), this, parent, mgmtId, !isLink));
agent->addObject(mgmtObject);
}
}
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 30a93e338c..c72b148338 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -40,7 +40,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) :
if (agent.get () != 0)
{
mgmtExchange = management::Exchange::shared_ptr
- (new management::Exchange (this, parent, _name, durable));
+ (new management::Exchange (agent.get(), this, parent, _name, durable));
agent->addObject (mgmtExchange);
}
}
@@ -56,7 +56,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
if (agent.get () != 0)
{
mgmtExchange = management::Exchange::shared_ptr
- (new management::Exchange (this, parent, _name, durable));
+ (new management::Exchange (agent.get(), this, parent, _name, durable));
if (!durable) {
if (name == "")
agent->addObject (mgmtExchange, 4, 1); // Special default exchange ID
@@ -134,7 +134,7 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang
{
uint64_t queueId = mo->getObjectId();
mgmtBinding = management::Binding::shared_ptr
- (new management::Binding (this, (Manageable*) parent, queueId, key, args));
+ (new management::Binding (agent.get(), this, (Manageable*) parent, queueId, key, args));
agent->addObject (mgmtBinding);
}
}
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 630ce68150..87c0020dcb 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -63,7 +63,7 @@ Link::Link(LinkRegistry* _links,
if (agent.get() != 0)
{
mgmtObject = management::Link::shared_ptr
- (new management::Link(this, parent, _host, _port, _useSsl, _durable));
+ (new management::Link(agent.get(), this, parent, _host, _port, _useSsl, _durable));
if (!durable)
agent->addObject(mgmtObject);
}
@@ -109,7 +109,8 @@ void Link::startConnectionLH ()
boost::bind (&Link::closed, this, _1, _2));
} catch(std::exception& e) {
setStateLH(STATE_WAITING);
- mgmtObject->set_lastError (e.what());
+ if (mgmtObject.get() != 0)
+ mgmtObject->set_lastError (e.what());
}
}
@@ -141,7 +142,8 @@ void Link::closed (int, std::string text)
if (state != STATE_FAILED)
{
setStateLH(STATE_WAITING);
- mgmtObject->set_lastError (text);
+ if (mgmtObject.get() != 0)
+ mgmtObject->set_lastError (text);
}
if (closing)
@@ -257,7 +259,8 @@ void Link::notifyConnectionForced(const string text)
Mutex::ScopedLock mutex(lock);
setStateLH(STATE_FAILED);
- mgmtObject->set_lastError(text);
+ if (mgmtObject.get() != 0)
+ mgmtObject->set_lastError(text);
}
void Link::setPersistenceId(uint64_t id) const
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index becca8dfcf..40f249bc11 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -71,7 +71,7 @@ Queue::Queue(const string& _name, bool _autodelete,
if (agent.get () != 0)
{
mgmtObject = management::Queue::shared_ptr
- (new management::Queue (this, parent, _name, _store != 0, _autodelete, _owner != 0));
+ (new management::Queue (agent.get(), this, parent, _name, _store != 0, _autodelete, _owner != 0));
// Add the object to the management agent only if this queue is not durable.
// If it's durable, we will add it later when the queue is assigned a persistenceId.
@@ -113,6 +113,7 @@ bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg)
}
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
+
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
@@ -128,19 +129,15 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
// if no store then mark as enqueued
if (!enqueue(0, msg)){
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- mgmtObject->inc_msgDepth ();
}
push(msg);
msg->enqueueComplete();
}else {
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- mgmtObject->inc_msgDepth ();
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
}
@@ -155,12 +152,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- mgmtObject->inc_msgDepth ();
}
if (store && !msg->isContentLoaded()) {
@@ -173,12 +168,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject.get() != 0) {
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
- mgmtObject->inc_msgDepth ();
if (msg->isPersistent ()) {
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
@@ -362,10 +355,8 @@ void Queue::consume(Consumer& c, bool requestExclusive){
}
consumerCount++;
- if (mgmtObject.get() != 0){
- Mutex::ScopedLock mutex(mgmtObject->getLock());
+ if (mgmtObject.get() != 0)
mgmtObject->inc_consumerCount ();
- }
}
void Queue::cancel(Consumer& c){
@@ -373,10 +364,8 @@ void Queue::cancel(Consumer& c){
Mutex::ScopedLock locker(consumerLock);
consumerCount--;
if(exclusive) exclusive = 0;
- if (mgmtObject.get() != 0){
- Mutex::ScopedLock mutex(mgmtObject->getLock());
+ if (mgmtObject.get() != 0)
mgmtObject->dec_consumerCount ();
- }
}
QueuedMessage Queue::dequeue(){
@@ -413,10 +402,8 @@ void Queue::pop(){
if (policy.get()) policy->dequeued(msg.payload->contentSize());
if (mgmtObject.get() != 0){
- Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalDequeues ();
mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
- mgmtObject->dec_msgDepth ();
if (msg.payload->isPersistent ()){
mgmtObject->inc_msgPersistDequeues ();
mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index d7089424a5..95145e5d0e 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -64,7 +64,7 @@ SessionState::SessionState(
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
if (agent.get () != 0) {
mgmtObject = management::Session::shared_ptr
- (new management::Session (this, parent, getId().getName()));
+ (new management::Session (agent.get(), this, parent, getId().getName()));
mgmtObject->set_attached (0);
mgmtObject->set_detachedLifespan (0);
agent->addObject (mgmtObject);
diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp
index da886710ac..107942fab5 100644
--- a/qpid/cpp/src/qpid/broker/System.cpp
+++ b/qpid/cpp/src/qpid/broker/System.cpp
@@ -63,7 +63,7 @@ System::System (string _dataDir)
}
mgmtObject = management::System::shared_ptr
- (new management::System (this, systemId));
+ (new management::System (agent.get(), this, systemId));
struct utsname _uname;
if (uname (&_uname) == 0)
{
diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp
index a809679d57..cfe497c788 100644
--- a/qpid/cpp/src/qpid/broker/Vhost.cpp
+++ b/qpid/cpp/src/qpid/broker/Vhost.cpp
@@ -32,7 +32,7 @@ Vhost::Vhost (management::Manageable* parentBroker)
if (agent.get () != 0)
{
mgmtObject = management::Vhost::shared_ptr
- (new management::Vhost (this, parentBroker, "/"));
+ (new management::Vhost (agent.get(), this, parentBroker, "/"));
agent->addObject (mgmtObject, 3, 1);
}
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
deleted file mode 100644
index e69de29bb2..0000000000
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ /dev/null
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index c38e273c49..c8a1b37823 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -37,6 +37,8 @@ class ManagementAgent
static shared_ptr getAgent (void);
+ virtual int getMaxThreads() = 0;
+
virtual void RegisterClass (std::string packageName,
std::string className,
uint8_t* md5Sum,
diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp
index 24d18875b6..271a2ec73c 100644
--- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp
@@ -47,8 +47,8 @@ ManagementBroker::RemoteAgent::~RemoteAgent ()
mgmtObject->resourceDestroy ();
}
-ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Manageable* _broker) :
- dataDir (_dataDir), interval (_interval), broker (_broker)
+ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Manageable* _broker, int _threads) :
+ threadPoolSize(_threads), dataDir(_dataDir), interval(_interval), broker(_broker)
{
timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
localBank = 5;
@@ -105,11 +105,11 @@ void ManagementBroker::writeData ()
}
}
-void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker)
+void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize)
{
enabled = 1;
if (agent.get () == 0)
- agent = shared_ptr (new ManagementBroker (dataDir, interval, broker));
+ agent = shared_ptr (new ManagementBroker (dataDir, interval, broker, threadPoolSize));
}
ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
@@ -634,7 +634,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
RemoteAgent* agent = new RemoteAgent;
agent->objIdBank = assignedBank;
agent->mgmtObject = management::Agent::shared_ptr
- (new management::Agent (agent));
+ (new management::Agent (this, agent));
agent->mgmtObject->set_sessionId (sessionId);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h
index 7548773960..18d30096e5 100644
--- a/qpid/cpp/src/qpid/management/ManagementBroker.h
+++ b/qpid/cpp/src/qpid/management/ManagementBroker.h
@@ -41,19 +41,21 @@ class ManagementBroker : public ManagementAgent
{
private:
- ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker);
+ ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize);
+ int threadPoolSize;
public:
virtual ~ManagementBroker ();
- static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker);
+ static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize);
static shared_ptr getAgent (void);
static void shutdown (void);
void setInterval (uint16_t _interval) { interval = _interval; }
void setExchange (broker::Exchange::shared_ptr mgmtExchange,
broker::Exchange::shared_ptr directExchange);
+ int getMaxThreads () { return threadPoolSize; }
void RegisterClass (std::string packageName,
std::string className,
uint8_t* md5Sum,
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp
index 68d7e5c886..2528ed4284 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp
@@ -21,12 +21,15 @@
#include "Manageable.h"
#include "ManagementObject.h"
+#include "ManagementAgent.h"
#include "qpid/framing/FieldTable.h"
using namespace qpid::framing;
using namespace qpid::management;
using namespace qpid::sys;
+int ManagementObject::nextThreadIndex = 0;
+
void ManagementObject::writeTimestamps (Buffer& buf)
{
buf.putShortString (getPackageName ());
@@ -40,3 +43,14 @@ void ManagementObject::writeTimestamps (Buffer& buf)
void ManagementObject::setReference(uint64_t) {}
+int ManagementObject::getThreadIndex() {
+ static __thread int thisIndex = -1;
+ if (thisIndex == -1) {
+ sys::Mutex::ScopedLock mutex(accessLock);
+ thisIndex = nextThreadIndex;
+ if (nextThreadIndex < agent->getMaxThreads() - 1)
+ nextThreadIndex++;
+ }
+ return thisIndex;
+}
+
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h
index cf2da13b09..732dd14a24 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.h
+++ b/qpid/cpp/src/qpid/management/ManagementObject.h
@@ -32,19 +32,22 @@ namespace qpid {
namespace management {
class Manageable;
+class ManagementAgent;
class ManagementObject
{
protected:
- uint64_t createTime;
- uint64_t destroyTime;
- uint64_t objectId;
- bool configChanged;
- bool instChanged;
- bool deleted;
- Manageable* coreObject;
- sys::Mutex accessLock;
+ uint64_t createTime;
+ uint64_t destroyTime;
+ uint64_t objectId;
+ bool configChanged;
+ bool instChanged;
+ bool deleted;
+ Manageable* coreObject;
+ sys::Mutex accessLock;
+ ManagementAgent* agent;
+ int maxThreads;
static const uint8_t TYPE_U8 = 1;
static const uint8_t TYPE_U16 = 2;
@@ -73,15 +76,18 @@ class ManagementObject
static const uint8_t FLAG_INDEX = 0x02;
static const uint8_t FLAG_END = 0x80;
+ static int nextThreadIndex;
+
+ int getThreadIndex();
void writeTimestamps (qpid::framing::Buffer& buf);
public:
typedef boost::shared_ptr<ManagementObject> shared_ptr;
typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
- ManagementObject (Manageable* _core) :
+ ManagementObject (ManagementAgent* _agent, Manageable* _core) :
destroyTime(0), objectId (0), configChanged(true),
- instChanged(true), deleted(false), coreObject(_core)
+ instChanged(true), deleted(false), coreObject(_core), agent(_agent)
{ createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); }
virtual ~ManagementObject () {}
@@ -102,8 +108,7 @@ class ManagementObject
uint64_t getObjectId (void) { return objectId; }
inline bool getConfigChanged (void) { return configChanged; }
virtual bool getInstChanged (void) { return instChanged; }
- inline void setAllChanged (void)
- {
+ inline void setAllChanged (void) {
configChanged = true;
instChanged = true;
}
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index 2d3cf092c4..320e578512 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -124,13 +124,13 @@
===============================================================
-->
<class name="Queue">
- <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/>
- <property name="name" type="sstr" access="RC" index="y"/>
+ <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/>
+ <property name="name" type="sstr" access="RC" index="y"/>
- <property name="durable" type="bool" access="RC"/>
- <property name="autoDelete" type="bool" access="RC"/>
- <property name="exclusive" type="bool" access="RC"/>
- <property name="arguments" type="ftable" access="RO" desc="Arguments supplied in queue.declare"/>
+ <property name="durable" type="bool" access="RC"/>
+ <property name="autoDelete" type="bool" access="RC"/>
+ <property name="exclusive" type="bool" access="RC"/>
+ <property name="arguments" type="map" access="RO" desc="Arguments supplied in queue.declare"/>
<statistic name="msgTotalEnqueues" type="count64" unit="message" desc="Total messages enqueued"/>
<statistic name="msgTotalDequeues" type="count64" unit="message" desc="Total messages dequeued"/>
@@ -138,9 +138,8 @@
<statistic name="msgTxnDequeues" type="count64" unit="message" desc="Transactional messages dequeued"/>
<statistic name="msgPersistEnqueues" type="count64" unit="message" desc="Persistent messages enqueued"/>
<statistic name="msgPersistDequeues" type="count64" unit="message" desc="Persistent messages dequeued"/>
- <statistic name="msgDepth" type="atomic32" unit="message" desc="Current size of queue in messages"/>
- <statistic name="byteDepth" type="count32" unit="octet"
- assign="byteTotalEnqueues - byteTotalDequeues" desc="Current size of queue in bytes"/>
+ <statistic name="msgDepth" type="count32" unit="message" desc="Current size of queue in messages" assign="msgTotalEnqueues - msgTotalDequeues"/>
+ <statistic name="byteDepth" type="count32" unit="octet" desc="Current size of queue in bytes" assign="byteTotalEnqueues - byteTotalDequeues"/>
<statistic name="byteTotalEnqueues" type="count64" unit="octet" desc="Total messages enqueued"/>
<statistic name="byteTotalDequeues" type="count64" unit="octet" desc="Total messages dequeued"/>
<statistic name="byteTxnEnqueues" type="count64" unit="octet" desc="Transactional messages enqueued"/>
@@ -150,11 +149,11 @@
<statistic name="enqueueTxnStarts" type="count64" unit="transaction" desc="Total enqueue transactions started "/>
<statistic name="enqueueTxnCommits" type="count64" unit="transaction" desc="Total enqueue transactions committed"/>
<statistic name="enqueueTxnRejects" type="count64" unit="transaction" desc="Total enqueue transactions rejected"/>
- <statistic name="enqueueTxnCount" type="hilo32" unit="transaction" desc="Current pending enqueue transactions"/>
+ <statistic name="enqueueTxnCount" type="count32" unit="transaction" desc="Current pending enqueue transactions"/>
<statistic name="dequeueTxnStarts" type="count64" unit="transaction" desc="Total dequeue transactions started"/>
<statistic name="dequeueTxnCommits" type="count64" unit="transaction" desc="Total dequeue transactions committed"/>
<statistic name="dequeueTxnRejects" type="count64" unit="transaction" desc="Total dequeue transactions rejected"/>
- <statistic name="dequeueTxnCount" type="hilo32" unit="transaction" desc="Current pending dequeue transactions"/>
+ <statistic name="dequeueTxnCount" type="count32" unit="transaction" desc="Current pending dequeue transactions"/>
<statistic name="consumerCount" type="hilo32" unit="consumer" desc="Current consumers on queue"/>
<statistic name="bindingCount" type="hilo32" unit="binding" desc="Current bindings"/>
<statistic name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/>
@@ -190,10 +189,10 @@
===============================================================
-->
<class name="Binding">
- <property name="exchangeRef" type="objId" references="Exchange" access="RC" index="y" parentRef="y"/>
- <property name="queueRef" type="objId" references="Queue" access="RC" index="y"/>
- <property name="bindingKey" type="sstr" access="RC" index="y"/>
- <property name="arguments" type="ftable" access="RC"/>
+ <property name="exchangeRef" type="objId" references="Exchange" access="RC" index="y" parentRef="y"/>
+ <property name="queueRef" type="objId" references="Queue" access="RC" index="y"/>
+ <property name="bindingKey" type="sstr" access="RC" index="y"/>
+ <property name="arguments" type="map" access="RC"/>
<statistic name="msgMatched" type="count64"/>
</class>
diff --git a/qpid/specs/management-types.xml b/qpid/specs/management-types.xml
index aad6b348c3..7ed320f6fa 100644
--- a/qpid/specs/management-types.xml
+++ b/qpid/specs/management-types.xml
@@ -32,30 +32,21 @@
<type name="float" base="FLOAT" cpp="float" encode="@.putFloat (#)" decode="# = @.getFloat ()" accessor="direct" init="0."/>
<type name="double" base="DOUBLE" cpp="double" encode="@.putDouble (#)" decode="# = @.getDouble ()" accessor="direct" init="0."/>
<type name="uuid" base="UUID" cpp="framing::Uuid" encode="#.encode (@)" decode="#.decode (@)" accessor="direct"/>
-<type name="ftable" base="FTABLE" cpp="framing::FieldTable" encode="#.encode (@)" decode="#.decode (@)" accessor="direct"/>
+<type name="map" base="FTABLE" cpp="framing::FieldTable" encode="#.encode (@)" decode="#.decode (@)" accessor="direct"/>
<type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" style="wm" accessor="counter" init="0"/>
<type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" style="wm" accessor="counter" init="0"/>
<type name="hilo32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" style="wm" accessor="counter" init="0"/>
<type name="hilo64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="wm" accessor="counter" init="0"/>
-<type name="count8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="counter" init="0"/>
-<type name="count16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="counter" init="0"/>
-<type name="count32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="counter" init="0"/>
-<type name="count64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="counter" init="0"/>
-
-<type name="atomic32" base="U32" cpp="qpid::sys::AtomicCount" encode="@.putLong(#)" decode="" accessor="counterByOne"/>
+<type name="count8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="counter" init="0" perThread="y"/>
+<type name="count16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="counter" init="0" perThread="y"/>
+<type name="count32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="counter" init="0" perThread="y"/>
+<type name="count64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="counter" init="0" perThread="y"/>
<!-- Min/Max/Average statistics -->
-<type name="mma32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" style="mma" accessor="direct" init="0"/>
-<type name="mma64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/>
-<type name="mmaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/>
-
-<!-- Some Proposed Syntax for User-Defined Types:
-<enum name="enumeratedType" base="U8">
- <item name="value-name1" value="1"/>
- <item name="value-name2" value="2"/>
-</enum>
--->
+<type name="mma32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" style="mma" accessor="direct" init="0" perThread="y"/>
+<type name="mma64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0" perThread="y"/>
+<type name="mmaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0" perThread="y"/>
</schema-types>