diff options
author | Ted Ross <tross@apache.org> | 2008-06-30 19:00:49 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-06-30 19:00:49 +0000 |
commit | d2051d8e6910c4cbcd9c2ce2ef01089360f83e43 (patch) | |
tree | 14142fcee4c5aa5decfaf138f2d04e8d6f1b9651 | |
parent | 061d6a61e73c8d4e43a711e526d6586db9f54c01 (diff) | |
download | qpid-python-d2051d8e6910c4cbcd9c2ce2ef01089360f83e43.tar.gz |
QPID-1160 - Per-thread counters in management API to avoid locking
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@672864 13f79535-47bb-0310-9956-ffa450edef68
21 files changed, 394 insertions, 188 deletions
diff --git a/qpid/cpp/managementgen/generate.py b/qpid/cpp/managementgen/generate.py index da78d6c7e9..197deec4f1 100755 --- a/qpid/cpp/managementgen/generate.py +++ b/qpid/cpp/managementgen/generate.py @@ -30,39 +30,62 @@ class Template: Expandable File Template - This class is instantiated each time a template is to be expanded. It is instantiated with the "filename" which is the full path to the template file and the "handler" which - is an object that is responsible for storing variables (setVariable) - and expanding tags (substHandler). + is an object that is responsible for storing variables (setVariable), + checking conditions (testCondition), and expanding tags (substHandler). """ def __init__ (self, filename, handler): self.filename = filename self.handler = handler self.handler.initExpansion () + self.writing = True def expandLine (self, line, stream, object): cursor = 0 while 1: sub = line.find ("/*MGEN:", cursor) if sub == -1: - stream.write (line[cursor:len (line)]) + if self.writing: + stream.write (line[cursor:len (line)]) return subend = line.find("*/", sub) - stream.write (line[cursor:sub]) + if self.writing: + stream.write (line[cursor:sub]) cursor = subend + 2 - tag = line[sub:subend] - equalPos = tag.find ("=") - if equalPos == -1: - dotPos = tag.find (".") + tag = line[sub:subend] + + if tag[7:10] == "IF(": + close = tag.find(")") + if close == -1: + raise ValueError ("Missing ')' on condition") + cond = tag[10:close] + dotPos = cond.find (".") if dotPos == -1: - raise ValueError ("Invalid tag: %s" % tag) - tagObject = tag[7:dotPos] - tagName = tag[dotPos + 1:len (tag)] - self.handler.substHandler (object, stream, tagObject, tagName) + raise ValueError ("Invalid condition tag: %s" % cond) + tagObject = cond[0:dotPos] + tagName = cond[dotPos + 1 : len(cond)] + if not self.handler.testCondition(object, tagObject, tagName): + self.writing = False + + elif tag[7:12] == "ENDIF": + self.writing = True + else: - tagKey = tag[7:equalPos] - tagVal = tag[equalPos + 1:len (tag)] - self.handler.setVariable (tagKey, tagVal) + equalPos = tag.find ("=") + if equalPos == -1: + dotPos = tag.find (".") + if dotPos == -1: + raise ValueError ("Invalid tag: %s" % tag) + tagObject = tag[7:dotPos] + tagName = tag[dotPos + 1:len (tag)] + if self.writing: + self.handler.substHandler (object, stream, tagObject, tagName) + else: + tagKey = tag[7:equalPos] + tagVal = tag[equalPos + 1:len (tag)] + if self.writing: + self.handler.setVariable (tagKey, tagVal) def expand (self, object): fd = open (self.filename) @@ -224,6 +247,15 @@ class Generator: call = obj + ".gen" + tag + "(stream, self.variables)" eval (call) + def testCondition (self, object, tagObject, tag): + if tagObject == "Root": + obj = "self" + else: + obj = "object" # MUST be the same as the 2nd formal parameter + + call = obj + ".test" + tag + "(self.variables)" + return eval (call) + def setVariable (self, key, value): self.variables[key] = value diff --git a/qpid/cpp/managementgen/schema.py b/qpid/cpp/managementgen/schema.py index 4e1f898274..4f5dc216ab 100755 --- a/qpid/cpp/managementgen/schema.py +++ b/qpid/cpp/managementgen/schema.py @@ -26,14 +26,15 @@ import md5 #===================================================================================== class SchemaType: def __init__ (self, node): - self.name = None - self.base = None - self.cpp = None - self.encode = None - self.decode = None - self.style = "normal" - self.accessor = None - self.init = "0" + self.name = None + self.base = None + self.cpp = None + self.encode = None + self.decode = None + self.style = "normal" + self.accessor = None + self.init = "0" + self.perThread = False attrs = node.attributes for idx in range (attrs.length): @@ -63,6 +64,11 @@ class SchemaType: elif key == 'init': self.init = val + elif key == 'perThread': + if val != 'y': + raise ValueError ("Expected 'y' in perThread attribute") + self.perThread = True + else: raise ValueError ("Unknown attribute in type '%s'" % key) @@ -74,43 +80,38 @@ class SchemaType: return self.name def genAccessor (self, stream, varName, changeFlag = None): + if self.perThread: + prefix = "getThreadStats()->" + if self.style == "wm": + raise ValueError ("'wm' style types can't be per-thread") + else: + prefix = "" if self.accessor == "direct": stream.write (" inline void set_" + varName + " (" + self.cpp + " val){\n"); - stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n") + if not self.perThread: + stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n") if self.style != "mma": - stream.write (" " + varName + " = val;\n"); + stream.write (" " + prefix + varName + " = val;\n"); if self.style == "wm": stream.write (" if (" + varName + "Low > val)\n") stream.write (" " + varName + "Low = val;\n") stream.write (" if (" + varName + "High < val)\n") stream.write (" " + varName + "High = val;\n") if self.style == "mma": - stream.write (" " + varName + "Count++;\n") - stream.write (" " + varName + "Total += val;\n") - stream.write (" if (" + varName + "Min > val)\n") - stream.write (" " + varName + "Min = val;\n") - stream.write (" if (" + varName + "Max < val)\n") - stream.write (" " + varName + "Max = val;\n") - if changeFlag != None: - stream.write (" " + changeFlag + " = true;\n") - stream.write (" }\n"); - elif self.accessor == "counterByOne": - stream.write (" inline void inc_" + varName + " (){\n"); - stream.write (" ++" + varName + ";\n") - if changeFlag != None: - stream.write (" " + changeFlag + " = true;\n") - stream.write (" }\n"); - stream.write (" inline void dec_" + varName + " (){\n"); - stream.write (" --" + varName + ";\n") + stream.write (" " + prefix + varName + "Count++;\n") + stream.write (" " + prefix + varName + "Total += val;\n") + stream.write (" if (" + prefix + varName + "Min > val)\n") + stream.write (" " + prefix + varName + "Min = val;\n") + stream.write (" if (" + prefix + varName + "Max < val)\n") + stream.write (" " + prefix + varName + "Max = val;\n") if changeFlag != None: stream.write (" " + changeFlag + " = true;\n") stream.write (" }\n"); elif self.accessor == "counter": stream.write (" inline void inc_" + varName + " (" + self.cpp + " by = 1){\n"); - stream.write (" if (by == 1)\n") - stream.write (" ++" + varName + ";\n") - stream.write (" else\n") - stream.write (" " + varName + " += by;\n") + if not self.perThread: + stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n") + stream.write (" " + prefix + varName + " += by;\n") if self.style == "wm": stream.write (" if (" + varName + "High < " + varName + ")\n") stream.write (" " + varName + "High = " + varName + ";\n") @@ -118,27 +119,15 @@ class SchemaType: stream.write (" " + changeFlag + " = true;\n") stream.write (" }\n"); stream.write (" inline void dec_" + varName + " (" + self.cpp + " by = 1){\n"); - stream.write (" if (by == 1)\n") - stream.write (" " + varName + "--;\n") - stream.write (" else\n") - stream.write (" " + varName + " -= by;\n") + if not self.perThread: + stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n") + stream.write (" " + prefix + varName + " -= by;\n") if self.style == "wm": stream.write (" if (" + varName + "Low > " + varName + ")\n") stream.write (" " + varName + "Low = " + varName + ";\n") if changeFlag != None: stream.write (" " + changeFlag + " = true;\n") stream.write (" }\n"); - stream.write (" inline void set_" + varName + " (" + self.cpp + " val){\n"); - stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n") - stream.write (" " + varName + " = val;\n"); - if self.style == "wm": - stream.write (" if (" + varName + "Low > val)\n") - stream.write (" " + varName + "Low = val;\n") - stream.write (" if (" + varName + "High < val)\n") - stream.write (" " + varName + "High = val;\n") - if changeFlag != None: - stream.write (" " + changeFlag + " = true;\n") - stream.write (" }\n"); def genHiLoStatResets (self, stream, varName): if self.style == "wm": @@ -150,6 +139,13 @@ class SchemaType: stream.write (" " + varName + "Min = -1;\n") stream.write (" " + varName + "Max = 0;\n") + def genPerThreadHiLoStatResets (self, stream, varName): + if self.style == "mma": + stream.write (" threadStats->" + varName + "Count = 0;\n") + stream.write (" threadStats->" + varName + "Total = 0;\n") + stream.write (" threadStats->" + varName + "Min = -1;\n") + stream.write (" threadStats->" + varName + "Max = 0;\n") + def genWrite (self, stream, varName): if self.style != "mma": stream.write (" " + self.encode.replace ("@", "buf").replace ("#", varName) + ";\n") @@ -235,6 +231,8 @@ class SchemaConfig: elif key == 'type': self.type = Type (val, typespec) + if self.type.type.accessor != 'direct': + raise ValueError ("Class properties must have a type with a direct accessor") elif key == 'references': self.ref = val @@ -288,8 +286,8 @@ class SchemaConfig: return 1 return 0 - def genDeclaration (self, stream): - stream.write (" " + self.type.type.cpp + " " + self.name + ";\n") + def genDeclaration (self, stream, prefix=" "): + stream.write (prefix + self.type.type.cpp + " " + self.name + ";\n") def genFormalParam (self, stream): stream.write (self.type.type.cpp + " _" + self.name) @@ -360,17 +358,17 @@ class SchemaInst: def getName (self): return self.name - def genDeclaration (self, stream): + def genDeclaration (self, stream, prefix=" "): if self.type.type.style != "mma": - stream.write (" " + self.type.type.cpp + " " + self.name + ";\n") + stream.write (prefix + self.type.type.cpp + " " + self.name + ";\n") if self.type.type.style == 'wm': - stream.write (" " + self.type.type.cpp + " " + self.name + "High;\n") - stream.write (" " + self.type.type.cpp + " " + self.name + "Low;\n") + stream.write (prefix + self.type.type.cpp + " " + self.name + "High;\n") + stream.write (prefix + self.type.type.cpp + " " + self.name + "Low;\n") if self.type.type.style == "mma": - stream.write (" " + self.type.type.cpp + " " + self.name + "Count;\n") - stream.write (" uint64_t " + self.name + "Total;\n") - stream.write (" " + self.type.type.cpp + " " + self.name + "Min;\n") - stream.write (" " + self.type.type.cpp + " " + self.name + "Max;\n") + stream.write (prefix + self.type.type.cpp + " " + self.name + "Count;\n") + stream.write (prefix + "uint64_t " + self.name + "Total;\n") + stream.write (prefix + self.type.type.cpp + " " + self.name + "Min;\n") + stream.write (prefix + self.type.type.cpp + " " + self.name + "Max;\n") def genAccessor (self, stream): self.type.type.genAccessor (stream, self.name, "instChanged") @@ -378,6 +376,9 @@ class SchemaInst: def genHiLoStatResets (self, stream): self.type.type.genHiLoStatResets (stream, self.name) + def genPerThreadHiLoStatResets (self, stream): + self.type.type.genPerThreadHiLoStatResets (stream, self.name) + def genSchemaText (self, stream, name, desc): stream.write (" ft = FieldTable ();\n") stream.write (" ft.setString (NAME, \"" + name + "\");\n") @@ -416,26 +417,51 @@ class SchemaInst: def genAssign (self, stream): if self.assign != None: - stream.write (" " + self.name + " = (" + self.type.type.cpp + ") (" + self.assign + ");\n") + if self.type.type.perThread: + prefix = " threadStats->" + else: + prefix = "" + stream.write (" " + prefix + self.name + " = (" + self.type.type.cpp + + ") (" + self.assign + ");\n") def genWrite (self, stream): - self.type.type.genWrite (stream, self.name) + if self.type.type.perThread: + self.type.type.genWrite (stream, "totals." + self.name) + else: + self.type.type.genWrite (stream, self.name) - def genInitialize (self, stream): + def genInitialize (self, stream, prefix="", indent=" "): val = self.type.type.init - if self.type.type.accessor == "counterByOne": - return if self.type.type.style != "mma": - stream.write (" " + self.name + " = " + val + ";\n") + stream.write (indent + prefix + self.name + " = " + val + ";\n") if self.type.type.style == "wm": - stream.write (" " + self.name + "High = " + val + ";\n") - stream.write (" " + self.name + "Low = " + val + ";\n") + stream.write (indent + prefix + self.name + "High = " + val + ";\n") + stream.write (indent + prefix + self.name + "Low = " + val + ";\n") if self.type.type.style == "mma": - stream.write (" " + self.name + "Count = 0;\n") - stream.write (" " + self.name + "Min = -1;\n") - stream.write (" " + self.name + "Max = 0;\n") - stream.write (" " + self.name + "Total = 0;\n") + stream.write (indent + prefix + self.name + "Count = 0;\n") + stream.write (indent + prefix + self.name + "Min = -1;\n") + stream.write (indent + prefix + self.name + "Max = 0;\n") + stream.write (indent + prefix + self.name + "Total = 0;\n") + def genInitializeTotalPerThreadStats (self, stream): + if self.type.type.style == "mma": + stream.write (" totals->" + self.name + "Count = 0;\n") + stream.write (" totals->" + self.name + "Min = -1;\n") + stream.write (" totals->" + self.name + "Max = 0;\n") + stream.write (" totals->" + self.name + "Total = 0;\n") + else: + stream.write (" totals->" + self.name + " = 0;\n") + + def genAggregatePerThreadStats (self, stream): + if self.type.type.style == "mma": + stream.write (" totals->%sCount += threadStats->%sCount;\n" % (self.name, self.name)) + stream.write (" if (totals->%sMin > threadStats->%sMin)\n" % (self.name, self.name)) + stream.write (" totals->%sMin = threadStats->%sMin;\n" % (self.name, self.name)) + stream.write (" if (totals->%sMax < threadStats->%sMax)\n" % (self.name, self.name)) + stream.write (" totals->%sMax = threadStats->%sMax;\n" % (self.name, self.name)) + stream.write (" totals->%sTotal += threadStats->%sTotal;\n" % (self.name, self.name)) + else: + stream.write (" totals->%s += threadStats->%s;\n" % (self.name, self.name)) #===================================================================================== # @@ -664,6 +690,26 @@ class SchemaClass: else: raise ValueError ("Unknown class tag '%s'" % child.nodeName) + # Adjust the 'assign' attributes for each statistic + for stat in self.statistics: + if stat.assign != None and stat.type.type.perThread: + stat.assign = self.adjust (stat.assign, self.statistics) + + def adjust (self, text, statistics): + result = text + start = 0 + while True: + next = None + for stat in statistics: + pos = result.find (stat.name, start) + if pos != -1 and (next == None or pos < next[0]): + next = (pos, stat.name) + if next == None: + return result + pos = next[0] + result = result[0:pos] + "threadStats->" + result[pos:] + start = pos + 9 + len(next[1]) + def hash (self, node): attrs = node.attributes self.md5Sum.update (node.nodeName) @@ -711,12 +757,34 @@ class SchemaClass: # Code Generation Functions. The names of these functions (minus the leading "gen") # match the substitution keywords in the template files. #=================================================================================== + def testExistPerThreadStats (self, variables): + for inst in self.statistics: + if inst.type.type.perThread: + return True + return False + + def testExistPerThreadAssign (self, variables): + for inst in self.statistics: + if inst.type.type.perThread and inst.assign != None: + return True + return False + + def testExistPerThreadResets (self, variables): + for inst in self.statistics: + if inst.type.type.perThread and inst.type.type.style == "mma": + return True + return False + + def testNoStatistics (self, variables): + return len (self.statistics) == 0 + def genAccessorMethods (self, stream, variables): for config in self.properties: if config.access != "RC": config.genAccessor (stream) for inst in self.statistics: - inst.genAccessor (stream) + if inst.assign == None: + inst.genAccessor (stream) def genConfigCount (self, stream, variables): stream.write ("%d" % len (self.properties)) @@ -767,16 +835,33 @@ class SchemaClass: def genHiLoStatResets (self, stream, variables): for inst in self.statistics: - inst.genHiLoStatResets (stream) + if not inst.type.type.perThread: + inst.genHiLoStatResets (stream) + + def genPerThreadHiLoStatResets (self, stream, variables): + for inst in self.statistics: + if inst.type.type.perThread: + inst.genPerThreadHiLoStatResets (stream) def genInitializeElements (self, stream, variables): for inst in self.statistics: - inst.genInitialize (stream) + if not inst.type.type.perThread: + inst.genInitialize (stream) + + def genInitializePerThreadElements (self, stream, variables): + for inst in self.statistics: + if inst.type.type.perThread: + inst.genInitialize (stream, "threadStats->", " ") + + def genInitializeTotalPerThreadStats (self, stream, variables): + for inst in self.statistics: + if inst.type.type.perThread: + inst.genInitializeTotalPerThreadStats (stream) - def genInstChangedStub (self, stream, variables): - if len (self.statistics) == 0: - stream.write (" // Stub for getInstChanged. There are no inst elements\n") - stream.write (" bool getInstChanged (void) { return false; }\n") + def genAggregatePerThreadStats (self, stream, variables): + for inst in self.statistics: + if inst.type.type.perThread: + inst.genAggregatePerThreadStats (stream) def genInstCount (self, stream, variables): count = 0 @@ -790,7 +875,13 @@ class SchemaClass: def genInstDeclarations (self, stream, variables): for element in self.statistics: - element.genDeclaration (stream) + if not element.type.type.perThread: + element.genDeclaration (stream) + + def genPerThreadDeclarations (self, stream, variables): + for element in self.statistics: + if element.type.type.perThread: + element.genDeclaration (stream, " ") def genInstElementSchema (self, stream, variables): for inst in self.statistics: @@ -884,7 +975,13 @@ class SchemaClass: def genAssign (self, stream, variables): for inst in self.statistics: - inst.genAssign (stream) + if not inst.type.type.perThread: + inst.genAssign (stream) + + def genPerThreadAssign (self, stream, variables): + for inst in self.statistics: + if inst.type.type.perThread: + inst.genAssign (stream) def genWriteConfig (self, stream, variables): for config in self.properties: diff --git a/qpid/cpp/managementgen/templates/Class.cpp b/qpid/cpp/managementgen/templates/Class.cpp index 0fbb78b7f1..1ea1da5090 100644 --- a/qpid/cpp/managementgen/templates/Class.cpp +++ b/qpid/cpp/managementgen/templates/Class.cpp @@ -23,6 +23,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" +#include "qpid/management/ManagementAgent.h" #include "/*MGEN:Class.NameCap*/.h" /*MGEN:Class.MethodArgIncludes*/ @@ -36,15 +37,28 @@ string /*MGEN:Class.NameCap*/::className = string ("/*MGEN:Class.NameLower*/ uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] = {/*MGEN:Class.SchemaMD5*/}; -/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) : - ManagementObject(_core) - /*MGEN:Class.ConstructorInits*/ +/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (ManagementAgent* _agent, Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) : + ManagementObject(_agent, _core)/*MGEN:Class.ConstructorInits*/ { /*MGEN:Class.ParentRefAssignment*/ /*MGEN:Class.InitializeElements*/ +/*MGEN:IF(Class.ExistPerThreadStats)*/ + maxThreads = agent->getMaxThreads(); + perThreadStatsArray = new struct PerThreadStats*[maxThreads]; + for (int idx = 0; idx < maxThreads; idx++) + perThreadStatsArray[idx] = 0; +/*MGEN:ENDIF*/ } -/*MGEN:Class.NameCap*/::~/*MGEN:Class.NameCap*/ () {} +/*MGEN:Class.NameCap*/::~/*MGEN:Class.NameCap*/ () +{ +/*MGEN:IF(Class.ExistPerThreadStats)*/ + for (int idx = 0; idx < maxThreads; idx++) + if (perThreadStatsArray[idx] != 0) + delete perThreadStatsArray[idx]; + delete perThreadStatsArray; +/*MGEN:ENDIF*/ +} namespace { const string NAME("name"); @@ -85,6 +99,19 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) /*MGEN:Class.EventSchema*/ } +/*MGEN:IF(Class.ExistPerThreadStats)*/ +void /*MGEN:Class.NameCap*/::aggregatePerThreadStats(struct PerThreadStats* totals) +{ +/*MGEN:Class.InitializeTotalPerThreadStats*/ + for (int idx = 0; idx < maxThreads; idx++) { + struct PerThreadStats* threadStats = perThreadStatsArray[idx]; + if (threadStats != 0) { +/*MGEN:Class.AggregatePerThreadStats*/ + } + } +} +/*MGEN:ENDIF*/ + void /*MGEN:Class.NameCap*/::writeProperties (Buffer& buf) { sys::Mutex::ScopedLock mutex(accessLock); @@ -98,14 +125,33 @@ void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders) { sys::Mutex::ScopedLock mutex(accessLock); instChanged = false; +/*MGEN:IF(Class.ExistPerThreadAssign)*/ + for (int idx = 0; idx < maxThreads; idx++) { + struct PerThreadStats* threadStats = perThreadStatsArray[idx]; + if (threadStats != 0) { +/*MGEN:Class.PerThreadAssign*/ + } + } +/*MGEN:ENDIF*/ +/*MGEN:IF(Class.ExistPerThreadStats)*/ + struct PerThreadStats totals; + aggregatePerThreadStats(&totals); +/*MGEN:ENDIF*/ /*MGEN:Class.Assign*/ - if (!skipHeaders) writeTimestamps (buf); /*MGEN:Class.WriteInst*/ // Maintenance of hi-lo statistics /*MGEN:Class.HiLoStatResets*/ +/*MGEN:IF(Class.ExistPerThreadResets)*/ + for (int idx = 0; idx < maxThreads; idx++) { + struct PerThreadStats* threadStats = perThreadStatsArray[idx]; + if (threadStats != 0) { +/*MGEN:Class.PerThreadHiLoStatResets*/ + } + } +/*MGEN:ENDIF*/ } void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/) diff --git a/qpid/cpp/managementgen/templates/Class.h b/qpid/cpp/managementgen/templates/Class.h index aa01f8f105..557c7a45d5 100644 --- a/qpid/cpp/managementgen/templates/Class.h +++ b/qpid/cpp/managementgen/templates/Class.h @@ -26,7 +26,6 @@ #include "qpid/management/ManagementObject.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/Uuid.h" -#include "qpid/sys/AtomicCount.h" namespace qpid { namespace management { @@ -43,6 +42,27 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject /*MGEN:Class.ConfigDeclarations*/ // Statistics /*MGEN:Class.InstDeclarations*/ +/*MGEN:IF(Class.ExistPerThreadStats)*/ + // Per-Thread Statistics + struct PerThreadStats { +/*MGEN:Class.PerThreadDeclarations*/ + }; + + struct PerThreadStats** perThreadStatsArray; + + inline struct PerThreadStats* getThreadStats() { + int index = getThreadIndex(); + struct PerThreadStats* threadStats = perThreadStatsArray[index]; + if (threadStats == 0) { + threadStats = new(PerThreadStats); + perThreadStatsArray[index] = threadStats; +/*MGEN:Class.InitializePerThreadElements*/ + } + return threadStats; + } + + void aggregatePerThreadStats(struct PerThreadStats*); +/*MGEN:ENDIF*/ // Private Methods static void writeSchema (qpid::framing::Buffer& buf); void writeProperties (qpid::framing::Buffer& buf); @@ -51,15 +71,18 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf); - writeSchemaCall_t getWriteSchemaCall (void) { return writeSchema; } - -/*MGEN:Class.InstChangedStub*/ + writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; } +/*MGEN:IF(Class.NoStatistics)*/ + // Stub for getInstChanged. There are no statistics in this class. + bool getInstChanged (void) { return false; } +/*MGEN:ENDIF*/ public: friend class Package/*MGEN:Class.NamePackageCap*/; typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr; - /*MGEN:Class.NameCap*/ (Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/); + /*MGEN:Class.NameCap*/ (ManagementAgent* agent, + Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/); ~/*MGEN:Class.NameCap*/ (void); /*MGEN:Class.SetGeneralReferenceDeclaration*/ diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 18b2c52dad..9274de0555 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -38,13 +38,17 @@ namespace broker { Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, const management::ArgsLinkBridge& _args) : link(_link), id(_id), args(_args), - mgmtObject(new management::Bridge(this, link, id, args.i_durable, args.i_src, args.i_dest, - args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, - args.i_tag, args.i_excludes)), listener(l), name(Uuid(true).str()), persistenceId(0) { - if (!args.i_durable) - management::ManagementAgent::getAgent()->addObject(mgmtObject); + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); + if (agent.get() != 0) { + mgmtObject = management::Bridge::shared_ptr + (new management::Bridge(agent.get(), this, link, id, args.i_durable, args.i_src, args.i_dest, + args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, + args.i_tag, args.i_excludes)); + if (!args.i_durable) + agent->addObject(mgmtObject); + } } Bridge::~Bridge() diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index a3dd93899a..0b7886b3ba 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -135,7 +135,7 @@ Broker::Broker(const Broker::Options& conf) : if(conf.enableMgmt){ QPID_LOG(info, "Management enabled"); ManagementBroker::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), - conf.mgmtPubInterval, this); + conf.mgmtPubInterval, this, conf.workerThreads + 3); managementAgent = management::ManagementAgent::getAgent (); ((ManagementBroker*) managementAgent.get())->setInterval (conf.mgmtPubInterval); qpid::management::PackageQpid packageInitializer (managementAgent); @@ -143,7 +143,7 @@ Broker::Broker(const Broker::Options& conf) : System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ()); systemObject = System::shared_ptr (system); - mgmtObject = management::Broker::shared_ptr (new management::Broker (this, system, conf.port)); + mgmtObject = management::Broker::shared_ptr (new management::Broker (managementAgent.get(), this, system, conf.port)); mgmtObject->set_workerThreads (conf.workerThreads); mgmtObject->set_maxConns (conf.maxConnections); mgmtObject->set_connBacklog (conf.connectionBacklog); diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index b6f6b9cee9..9e763f6775 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -65,7 +65,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); if (agent.get() != 0) - mgmtObject = management::Connection::shared_ptr(new management::Connection(this, parent, mgmtId, !isLink)); + mgmtObject = management::Connection::shared_ptr + (new management::Connection(agent.get(), this, parent, mgmtId, !isLink)); agent->addObject(mgmtObject); } } diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 30a93e338c..c72b148338 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -40,7 +40,7 @@ Exchange::Exchange (const string& _name, Manageable* parent) : if (agent.get () != 0) { mgmtExchange = management::Exchange::shared_ptr - (new management::Exchange (this, parent, _name, durable)); + (new management::Exchange (agent.get(), this, parent, _name, durable)); agent->addObject (mgmtExchange); } } @@ -56,7 +56,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel if (agent.get () != 0) { mgmtExchange = management::Exchange::shared_ptr - (new management::Exchange (this, parent, _name, durable)); + (new management::Exchange (agent.get(), this, parent, _name, durable)); if (!durable) { if (name == "") agent->addObject (mgmtExchange, 4, 1); // Special default exchange ID @@ -134,7 +134,7 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang { uint64_t queueId = mo->getObjectId(); mgmtBinding = management::Binding::shared_ptr - (new management::Binding (this, (Manageable*) parent, queueId, key, args)); + (new management::Binding (agent.get(), this, (Manageable*) parent, queueId, key, args)); agent->addObject (mgmtBinding); } } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 630ce68150..87c0020dcb 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -63,7 +63,7 @@ Link::Link(LinkRegistry* _links, if (agent.get() != 0) { mgmtObject = management::Link::shared_ptr - (new management::Link(this, parent, _host, _port, _useSsl, _durable)); + (new management::Link(agent.get(), this, parent, _host, _port, _useSsl, _durable)); if (!durable) agent->addObject(mgmtObject); } @@ -109,7 +109,8 @@ void Link::startConnectionLH () boost::bind (&Link::closed, this, _1, _2)); } catch(std::exception& e) { setStateLH(STATE_WAITING); - mgmtObject->set_lastError (e.what()); + if (mgmtObject.get() != 0) + mgmtObject->set_lastError (e.what()); } } @@ -141,7 +142,8 @@ void Link::closed (int, std::string text) if (state != STATE_FAILED) { setStateLH(STATE_WAITING); - mgmtObject->set_lastError (text); + if (mgmtObject.get() != 0) + mgmtObject->set_lastError (text); } if (closing) @@ -257,7 +259,8 @@ void Link::notifyConnectionForced(const string text) Mutex::ScopedLock mutex(lock); setStateLH(STATE_FAILED); - mgmtObject->set_lastError(text); + if (mgmtObject.get() != 0) + mgmtObject->set_lastError(text); } void Link::setPersistenceId(uint64_t id) const diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index becca8dfcf..40f249bc11 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -71,7 +71,7 @@ Queue::Queue(const string& _name, bool _autodelete, if (agent.get () != 0) { mgmtObject = management::Queue::shared_ptr - (new management::Queue (this, parent, _name, _store != 0, _autodelete, _owner != 0)); + (new management::Queue (agent.get(), this, parent, _name, _store != 0, _autodelete, _owner != 0)); // Add the object to the management agent only if this queue is not durable. // If it's durable, we will add it later when the queue is assigned a persistenceId. @@ -113,6 +113,7 @@ bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg) } void Queue::deliver(boost::intrusive_ptr<Message>& msg){ + if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); @@ -128,19 +129,15 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ // if no store then mark as enqueued if (!enqueue(0, msg)){ if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); } push(msg); msg->enqueueComplete(); }else { if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); } @@ -155,12 +152,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); } if (store && !msg->isContentLoaded()) { @@ -173,12 +168,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); - mgmtObject->inc_msgDepth (); if (msg->isPersistent ()) { mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); @@ -362,10 +355,8 @@ void Queue::consume(Consumer& c, bool requestExclusive){ } consumerCount++; - if (mgmtObject.get() != 0){ - Mutex::ScopedLock mutex(mgmtObject->getLock()); + if (mgmtObject.get() != 0) mgmtObject->inc_consumerCount (); - } } void Queue::cancel(Consumer& c){ @@ -373,10 +364,8 @@ void Queue::cancel(Consumer& c){ Mutex::ScopedLock locker(consumerLock); consumerCount--; if(exclusive) exclusive = 0; - if (mgmtObject.get() != 0){ - Mutex::ScopedLock mutex(mgmtObject->getLock()); + if (mgmtObject.get() != 0) mgmtObject->dec_consumerCount (); - } } QueuedMessage Queue::dequeue(){ @@ -413,10 +402,8 @@ void Queue::pop(){ if (policy.get()) policy->dequeued(msg.payload->contentSize()); if (mgmtObject.get() != 0){ - Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); - mgmtObject->dec_msgDepth (); if (msg.payload->isPersistent ()){ mgmtObject->inc_msgPersistDequeues (); mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index d7089424a5..95145e5d0e 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -64,7 +64,7 @@ SessionState::SessionState( ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); if (agent.get () != 0) { mgmtObject = management::Session::shared_ptr - (new management::Session (this, parent, getId().getName())); + (new management::Session (agent.get(), this, parent, getId().getName())); mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); agent->addObject (mgmtObject); diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp index da886710ac..107942fab5 100644 --- a/qpid/cpp/src/qpid/broker/System.cpp +++ b/qpid/cpp/src/qpid/broker/System.cpp @@ -63,7 +63,7 @@ System::System (string _dataDir) } mgmtObject = management::System::shared_ptr - (new management::System (this, systemId)); + (new management::System (agent.get(), this, systemId)); struct utsname _uname; if (uname (&_uname) == 0) { diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp index a809679d57..cfe497c788 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.cpp +++ b/qpid/cpp/src/qpid/broker/Vhost.cpp @@ -32,7 +32,7 @@ Vhost::Vhost (management::Manageable* parentBroker) if (agent.get () != 0) { mgmtObject = management::Vhost::shared_ptr - (new management::Vhost (this, parentBroker, "/")); + (new management::Vhost (agent.get(), this, parentBroker, "/")); agent->addObject (mgmtObject, 3, 1); } } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp deleted file mode 100644 index e69de29bb2..0000000000 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ /dev/null diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index c38e273c49..c8a1b37823 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -37,6 +37,8 @@ class ManagementAgent static shared_ptr getAgent (void); + virtual int getMaxThreads() = 0; + virtual void RegisterClass (std::string packageName, std::string className, uint8_t* md5Sum, diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp index 24d18875b6..271a2ec73c 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp +++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp @@ -47,8 +47,8 @@ ManagementBroker::RemoteAgent::~RemoteAgent () mgmtObject->resourceDestroy (); } -ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Manageable* _broker) : - dataDir (_dataDir), interval (_interval), broker (_broker) +ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Manageable* _broker, int _threads) : + threadPoolSize(_threads), dataDir(_dataDir), interval(_interval), broker(_broker) { timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); localBank = 5; @@ -105,11 +105,11 @@ void ManagementBroker::writeData () } } -void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker) +void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize) { enabled = 1; if (agent.get () == 0) - agent = shared_ptr (new ManagementBroker (dataDir, interval, broker)); + agent = shared_ptr (new ManagementBroker (dataDir, interval, broker, threadPoolSize)); } ManagementAgent::shared_ptr ManagementAgent::getAgent (void) @@ -634,7 +634,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe RemoteAgent* agent = new RemoteAgent; agent->objIdBank = assignedBank; agent->mgmtObject = management::Agent::shared_ptr - (new management::Agent (agent)); + (new management::Agent (this, agent)); agent->mgmtObject->set_sessionId (sessionId); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h index 7548773960..18d30096e5 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.h +++ b/qpid/cpp/src/qpid/management/ManagementBroker.h @@ -41,19 +41,21 @@ class ManagementBroker : public ManagementAgent { private: - ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker); + ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize); + int threadPoolSize; public: virtual ~ManagementBroker (); - static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker); + static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize); static shared_ptr getAgent (void); static void shutdown (void); void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (broker::Exchange::shared_ptr mgmtExchange, broker::Exchange::shared_ptr directExchange); + int getMaxThreads () { return threadPoolSize; } void RegisterClass (std::string packageName, std::string className, uint8_t* md5Sum, diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index 68d7e5c886..2528ed4284 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -21,12 +21,15 @@ #include "Manageable.h" #include "ManagementObject.h" +#include "ManagementAgent.h" #include "qpid/framing/FieldTable.h" using namespace qpid::framing; using namespace qpid::management; using namespace qpid::sys; +int ManagementObject::nextThreadIndex = 0; + void ManagementObject::writeTimestamps (Buffer& buf) { buf.putShortString (getPackageName ()); @@ -40,3 +43,14 @@ void ManagementObject::writeTimestamps (Buffer& buf) void ManagementObject::setReference(uint64_t) {} +int ManagementObject::getThreadIndex() { + static __thread int thisIndex = -1; + if (thisIndex == -1) { + sys::Mutex::ScopedLock mutex(accessLock); + thisIndex = nextThreadIndex; + if (nextThreadIndex < agent->getMaxThreads() - 1) + nextThreadIndex++; + } + return thisIndex; +} + diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index cf2da13b09..732dd14a24 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -32,19 +32,22 @@ namespace qpid { namespace management { class Manageable; +class ManagementAgent; class ManagementObject { protected: - uint64_t createTime; - uint64_t destroyTime; - uint64_t objectId; - bool configChanged; - bool instChanged; - bool deleted; - Manageable* coreObject; - sys::Mutex accessLock; + uint64_t createTime; + uint64_t destroyTime; + uint64_t objectId; + bool configChanged; + bool instChanged; + bool deleted; + Manageable* coreObject; + sys::Mutex accessLock; + ManagementAgent* agent; + int maxThreads; static const uint8_t TYPE_U8 = 1; static const uint8_t TYPE_U16 = 2; @@ -73,15 +76,18 @@ class ManagementObject static const uint8_t FLAG_INDEX = 0x02; static const uint8_t FLAG_END = 0x80; + static int nextThreadIndex; + + int getThreadIndex(); void writeTimestamps (qpid::framing::Buffer& buf); public: typedef boost::shared_ptr<ManagementObject> shared_ptr; typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); - ManagementObject (Manageable* _core) : + ManagementObject (ManagementAgent* _agent, Manageable* _core) : destroyTime(0), objectId (0), configChanged(true), - instChanged(true), deleted(false), coreObject(_core) + instChanged(true), deleted(false), coreObject(_core), agent(_agent) { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } virtual ~ManagementObject () {} @@ -102,8 +108,7 @@ class ManagementObject uint64_t getObjectId (void) { return objectId; } inline bool getConfigChanged (void) { return configChanged; } virtual bool getInstChanged (void) { return instChanged; } - inline void setAllChanged (void) - { + inline void setAllChanged (void) { configChanged = true; instChanged = true; } diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index 2d3cf092c4..320e578512 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -124,13 +124,13 @@ =============================================================== --> <class name="Queue"> - <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> - <property name="name" type="sstr" access="RC" index="y"/> + <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> + <property name="name" type="sstr" access="RC" index="y"/> - <property name="durable" type="bool" access="RC"/> - <property name="autoDelete" type="bool" access="RC"/> - <property name="exclusive" type="bool" access="RC"/> - <property name="arguments" type="ftable" access="RO" desc="Arguments supplied in queue.declare"/> + <property name="durable" type="bool" access="RC"/> + <property name="autoDelete" type="bool" access="RC"/> + <property name="exclusive" type="bool" access="RC"/> + <property name="arguments" type="map" access="RO" desc="Arguments supplied in queue.declare"/> <statistic name="msgTotalEnqueues" type="count64" unit="message" desc="Total messages enqueued"/> <statistic name="msgTotalDequeues" type="count64" unit="message" desc="Total messages dequeued"/> @@ -138,9 +138,8 @@ <statistic name="msgTxnDequeues" type="count64" unit="message" desc="Transactional messages dequeued"/> <statistic name="msgPersistEnqueues" type="count64" unit="message" desc="Persistent messages enqueued"/> <statistic name="msgPersistDequeues" type="count64" unit="message" desc="Persistent messages dequeued"/> - <statistic name="msgDepth" type="atomic32" unit="message" desc="Current size of queue in messages"/> - <statistic name="byteDepth" type="count32" unit="octet" - assign="byteTotalEnqueues - byteTotalDequeues" desc="Current size of queue in bytes"/> + <statistic name="msgDepth" type="count32" unit="message" desc="Current size of queue in messages" assign="msgTotalEnqueues - msgTotalDequeues"/> + <statistic name="byteDepth" type="count32" unit="octet" desc="Current size of queue in bytes" assign="byteTotalEnqueues - byteTotalDequeues"/> <statistic name="byteTotalEnqueues" type="count64" unit="octet" desc="Total messages enqueued"/> <statistic name="byteTotalDequeues" type="count64" unit="octet" desc="Total messages dequeued"/> <statistic name="byteTxnEnqueues" type="count64" unit="octet" desc="Transactional messages enqueued"/> @@ -150,11 +149,11 @@ <statistic name="enqueueTxnStarts" type="count64" unit="transaction" desc="Total enqueue transactions started "/> <statistic name="enqueueTxnCommits" type="count64" unit="transaction" desc="Total enqueue transactions committed"/> <statistic name="enqueueTxnRejects" type="count64" unit="transaction" desc="Total enqueue transactions rejected"/> - <statistic name="enqueueTxnCount" type="hilo32" unit="transaction" desc="Current pending enqueue transactions"/> + <statistic name="enqueueTxnCount" type="count32" unit="transaction" desc="Current pending enqueue transactions"/> <statistic name="dequeueTxnStarts" type="count64" unit="transaction" desc="Total dequeue transactions started"/> <statistic name="dequeueTxnCommits" type="count64" unit="transaction" desc="Total dequeue transactions committed"/> <statistic name="dequeueTxnRejects" type="count64" unit="transaction" desc="Total dequeue transactions rejected"/> - <statistic name="dequeueTxnCount" type="hilo32" unit="transaction" desc="Current pending dequeue transactions"/> + <statistic name="dequeueTxnCount" type="count32" unit="transaction" desc="Current pending dequeue transactions"/> <statistic name="consumerCount" type="hilo32" unit="consumer" desc="Current consumers on queue"/> <statistic name="bindingCount" type="hilo32" unit="binding" desc="Current bindings"/> <statistic name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/> @@ -190,10 +189,10 @@ =============================================================== --> <class name="Binding"> - <property name="exchangeRef" type="objId" references="Exchange" access="RC" index="y" parentRef="y"/> - <property name="queueRef" type="objId" references="Queue" access="RC" index="y"/> - <property name="bindingKey" type="sstr" access="RC" index="y"/> - <property name="arguments" type="ftable" access="RC"/> + <property name="exchangeRef" type="objId" references="Exchange" access="RC" index="y" parentRef="y"/> + <property name="queueRef" type="objId" references="Queue" access="RC" index="y"/> + <property name="bindingKey" type="sstr" access="RC" index="y"/> + <property name="arguments" type="map" access="RC"/> <statistic name="msgMatched" type="count64"/> </class> diff --git a/qpid/specs/management-types.xml b/qpid/specs/management-types.xml index aad6b348c3..7ed320f6fa 100644 --- a/qpid/specs/management-types.xml +++ b/qpid/specs/management-types.xml @@ -32,30 +32,21 @@ <type name="float" base="FLOAT" cpp="float" encode="@.putFloat (#)" decode="# = @.getFloat ()" accessor="direct" init="0."/> <type name="double" base="DOUBLE" cpp="double" encode="@.putDouble (#)" decode="# = @.getDouble ()" accessor="direct" init="0."/> <type name="uuid" base="UUID" cpp="framing::Uuid" encode="#.encode (@)" decode="#.decode (@)" accessor="direct"/> -<type name="ftable" base="FTABLE" cpp="framing::FieldTable" encode="#.encode (@)" decode="#.decode (@)" accessor="direct"/> +<type name="map" base="FTABLE" cpp="framing::FieldTable" encode="#.encode (@)" decode="#.decode (@)" accessor="direct"/> <type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" style="wm" accessor="counter" init="0"/> <type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" style="wm" accessor="counter" init="0"/> <type name="hilo32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" style="wm" accessor="counter" init="0"/> <type name="hilo64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="wm" accessor="counter" init="0"/> -<type name="count8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="counter" init="0"/> -<type name="count16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="counter" init="0"/> -<type name="count32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="counter" init="0"/> -<type name="count64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="counter" init="0"/> - -<type name="atomic32" base="U32" cpp="qpid::sys::AtomicCount" encode="@.putLong(#)" decode="" accessor="counterByOne"/> +<type name="count8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="counter" init="0" perThread="y"/> +<type name="count16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="counter" init="0" perThread="y"/> +<type name="count32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="counter" init="0" perThread="y"/> +<type name="count64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="counter" init="0" perThread="y"/> <!-- Min/Max/Average statistics --> -<type name="mma32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" style="mma" accessor="direct" init="0"/> -<type name="mma64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/> -<type name="mmaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/> - -<!-- Some Proposed Syntax for User-Defined Types: -<enum name="enumeratedType" base="U8"> - <item name="value-name1" value="1"/> - <item name="value-name2" value="2"/> -</enum> ---> +<type name="mma32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" style="mma" accessor="direct" init="0" perThread="y"/> +<type name="mma64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0" perThread="y"/> +<type name="mmaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0" perThread="y"/> </schema-types> |