summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/managementgen/templates/Class.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp7
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.h4
-rw-r--r--qpid/python/mgmt-cli/disp.py77
-rw-r--r--qpid/python/mgmt-cli/main.py164
-rw-r--r--qpid/python/mgmt-cli/managementdata.py417
-rw-r--r--qpid/python/qpid/management.py239
-rw-r--r--qpid/python/qpid/management.py.rej457
-rw-r--r--qpid/specs/management-schema.xml21
9 files changed, 1305 insertions, 83 deletions
diff --git a/qpid/cpp/managementgen/templates/Class.h b/qpid/cpp/managementgen/templates/Class.h
index cff915412e..6a54b2131c 100644
--- a/qpid/cpp/managementgen/templates/Class.h
+++ b/qpid/cpp/managementgen/templates/Class.h
@@ -22,6 +22,7 @@
/*MGEN:Root.Disclaimer*/
+#include "qpid/sys/Mutex.h"
#include "qpid/management/ManagementObject.h"
namespace qpid {
@@ -52,6 +53,7 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject
public:
typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr;
+ qpid::sys::Mutex accessorLock;
/*MGEN:Class.NameCap*/ (Manageable* coreObject, Manageable* parentObject,
/*MGEN:Class.ConstructorArgs*/);
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index e2fd998cc0..a5384014d8 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -97,6 +97,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete();
if (mgmtObject.get() != 0) {
+ Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
@@ -104,6 +105,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
}
}else {
if (mgmtObject.get() != 0) {
+ Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
@@ -122,6 +124,7 @@ void Queue::recover(intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
if (mgmtObject.get() != 0) {
+ Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
@@ -140,6 +143,7 @@ void Queue::recover(intrusive_ptr<Message>& msg){
void Queue::process(intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject.get() != 0) {
+ Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgTxnEnqueues ();
@@ -323,6 +327,7 @@ void Queue::consume(Consumer&, bool requestExclusive){
consumerCount++;
if (mgmtObject.get() != 0){
+ Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_consumers ();
}
}
@@ -333,6 +338,7 @@ void Queue::cancel(Consumer& c){
consumerCount--;
if(exclusive) exclusive = false;
if (mgmtObject.get() != 0){
+ Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->dec_consumers ();
}
}
@@ -363,6 +369,7 @@ void Queue::pop(){
if (policy.get()) policy->dequeued(msg.payload->contentSize());
if (mgmtObject.get() != 0){
+ Mutex::ScopedLock alock(mgmtObject->accessorLock);
mgmtObject->inc_msgTotalDequeues ();
mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
mgmtObject->dec_msgDepth ();
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h
index a8ba231419..ff136c397d 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.h
+++ b/qpid/cpp/src/qpid/management/ManagementObject.h
@@ -54,8 +54,8 @@ class ManagementObject
static const uint8_t TYPE_LSTR = 7;
static const uint8_t ACCESS_RC = 1;
- static const uint8_t ACCESS_RW = 1;
- static const uint8_t ACCESS_RO = 1;
+ static const uint8_t ACCESS_RW = 2;
+ static const uint8_t ACCESS_RO = 3;
static const uint8_t DIR_I = 1;
static const uint8_t DIR_O = 2;
diff --git a/qpid/python/mgmt-cli/disp.py b/qpid/python/mgmt-cli/disp.py
new file mode 100644
index 0000000000..5746a26e51
--- /dev/null
+++ b/qpid/python/mgmt-cli/disp.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from time import strftime, gmtime
+
+class Display:
+ """ Display formatting for QPID Management CLI """
+
+ def __init__ (self):
+ self.tableSpacing = 2
+ self.tablePrefix = " "
+ self.timestampFormat = "%X"
+
+ def table (self, title, heads, rows):
+ """ Print a formatted table with autosized columns """
+ print title
+ if len (rows) == 0:
+ return
+ colWidth = []
+ col = 0
+ line = self.tablePrefix
+ for head in heads:
+ width = len (head)
+ for row in rows:
+ cellWidth = len (str (row[col]))
+ if cellWidth > width:
+ width = cellWidth
+ colWidth.append (width + self.tableSpacing)
+ line = line + head
+ for i in range (colWidth[col] - len (head)):
+ line = line + " "
+ col = col + 1
+ print line
+ line = self.tablePrefix
+ for width in colWidth:
+ for i in range (width):
+ line = line + "="
+ print line
+
+ for row in rows:
+ line = self.tablePrefix
+ col = 0
+ for width in colWidth:
+ line = line + str (row[col])
+ for i in range (width - len (str (row[col]))):
+ line = line + " "
+ col = col + 1
+ print line
+
+ def do_setTimeFormat (self, fmt):
+ """ Select timestamp format """
+ if fmt == "long":
+ self.timestampFormat = "%c"
+ elif fmt == "short":
+ self.timestampFormat = "%X"
+
+ def timestamp (self, nsec):
+ """ Format a nanosecond-since-the-epoch timestamp for printing """
+ return strftime (self.timestampFormat, gmtime (nsec / 1000000000))
diff --git a/qpid/python/mgmt-cli/main.py b/qpid/python/mgmt-cli/main.py
new file mode 100644
index 0000000000..2990e25437
--- /dev/null
+++ b/qpid/python/mgmt-cli/main.py
@@ -0,0 +1,164 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import socket
+from cmd import Cmd
+from managementdata import ManagementData
+from shlex import split
+from disp import Display
+from qpid.peer import Closed
+
+class Mcli (Cmd):
+ """ Management Command Interpreter """
+ prompt = "qpid: "
+
+ def __init__ (self, dataObject, dispObject):
+ Cmd.__init__ (self)
+ self.dataObject = dataObject
+ self.dispObject = dispObject
+
+ def emptyline (self):
+ pass
+
+ def do_help (self, data):
+ print "Management Tool for QPID"
+ print
+ print "Commands:"
+ print " list - Print summary of existing objects by class"
+ print " list <className> - Print list of objects of the specified class"
+ print " list <className> all - Print contents of all objects of specified class"
+ print " list <className> <list-of-IDs> - Print contents of one or more objects"
+ print " list is space-separated, ranges may be specified (i.e. 1004-1010)"
+ print " call <ID> <methodName> [<args>] - Invoke a method on an object"
+ print " schema - Print summary of object classes seen on the target"
+ print " schema [className] - Print details of an object class"
+ print " set time-format short - Select short timestamp format (default)"
+ print " set time-format long - Select long timestamp format"
+ print " quit or ^D - Exit the program"
+ print
+
+ def complete_set (self, text, line, begidx, endidx):
+ """ Command completion for the 'set' command """
+ tokens = split (line)
+ if len (tokens) < 2:
+ return ["time-format "]
+ elif tokens[1] == "time-format":
+ if len (tokens) == 2:
+ return ["long", "short"]
+ elif len (tokens) == 3:
+ if "long".find (text) == 0:
+ return ["long"]
+ elif "short".find (text) == 0:
+ return ["short"]
+ elif "time-format".find (text) == 0:
+ return ["time-format "]
+ return []
+
+ def do_set (self, data):
+ tokens = split (data)
+ try:
+ if tokens[0] == "time-format":
+ self.dispObject.do_setTimeFormat (tokens[1])
+ except:
+ pass
+
+ def complete_schema (self, text, line, begidx, endidx):
+ tokens = split (line)
+ if len (tokens) > 2:
+ return []
+ return self.dataObject.classCompletions (text)
+
+ def do_schema (self, data):
+ self.dataObject.do_schema (data)
+
+ def complete_list (self, text, line, begidx, endidx):
+ tokens = split (line)
+ if len (tokens) > 2:
+ return []
+ return self.dataObject.classCompletions (text)
+
+ def do_list (self, data):
+ self.dataObject.do_list (data)
+
+ def do_call (self, data):
+ self.dataObject.do_call (data)
+
+ def do_EOF (self, data):
+ print "quit"
+ return True
+
+ def do_quit (self, data):
+ return True
+
+ def postcmd (self, stop, line):
+ return stop
+
+ def postloop (self):
+ print "Exiting..."
+ self.dataObject.close ()
+
+def Usage ():
+ print sys.argv[0], "[<target-host> [<tcp-port>]]"
+ print
+ sys.exit (1)
+
+#=========================================================
+# Main Program
+#=========================================================
+
+# Get host name and port if specified on the command line
+try:
+ (optlist, cargs) = getopt.getopt (sys.argv[1:], 's:')
+except:
+ Usage ()
+
+specpath = "/usr/share/amqp/amqp.0-10-preview.xml"
+host = "localhost"
+port = 5672
+
+if "s" in optlist:
+ specpath = optlist["s"]
+
+if len (cargs) > 0:
+ host = cargs[0]
+
+if len (cargs) > 1:
+ port = int (cargs[1])
+
+print ("Management Tool for QPID")
+disp = Display ()
+
+# Attempt to make a connection to the target broker
+try:
+ data = ManagementData (disp, host, port, spec=specpath)
+except socket.error, e:
+ sys.exit (0)
+except Closed, e:
+ if str(e).find ("Exchange not found") != -1:
+ print "Management not enabled on broker: Use '-m yes' option on broker startup."
+ sys.exit (0)
+
+# Instantiate the CLI interpreter and launch it.
+cli = Mcli (data, disp)
+cli.cmdloop ()
diff --git a/qpid/python/mgmt-cli/managementdata.py b/qpid/python/mgmt-cli/managementdata.py
new file mode 100644
index 0000000000..b770677825
--- /dev/null
+++ b/qpid/python/mgmt-cli/managementdata.py
@@ -0,0 +1,417 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.management import ManagedBroker
+from threading import Lock
+from disp import Display
+from shlex import split
+
+class ManagementData:
+
+ #
+ # Data Structure:
+ #
+ # Please note that this data structure holds only the most recent
+ # configuration and instrumentation data for each object. It does
+ # not hold the detailed historical data that is sent from the broker.
+ # The only historical data it keeps are the high and low watermarks
+ # for hi-lo statistics.
+ #
+ # tables :== {<class-name>}
+ # {<obj-id>}
+ # (timestamp, config-record, inst-record)
+ # timestamp :== (<last-interval-time>, <create-time>, <delete-time>)
+ # config-record :== [element]
+ # inst-record :== [element]
+ # element :== (<element-name>, <element-value>)
+ #
+
+ def dataHandler (self, context, className, list, timestamps):
+ """ Callback for configuration and instrumentation data updates """
+ self.lock.acquire ()
+ try:
+ # If this class has not been seen before, create an empty dictionary to
+ # hold objects of this class
+ if className not in self.tables:
+ self.tables[className] = {}
+
+ # Calculate a base-id so displayed IDs are reasonable 4-digit numbers
+ id = long (list[0][1])
+ if self.baseId == 0:
+ self.baseId = id - 1000
+
+ # If this object hasn't been seen before, create a new object record with
+ # the timestamps and empty lists for configuration and instrumentation data.
+ if id not in self.tables[className]:
+ self.tables[className][id] = (timestamps, [], [])
+
+ (unused, oldConf, oldInst) = self.tables[className][id]
+
+ # For config updates, simply replace old config list with the new one.
+ if context == 0: #config
+ self.tables[className][id] = (timestamps, list, oldInst)
+
+ # For instrumentation updates, carry the minimum and maximum values for
+ # "hi-lo" stats forward.
+ elif context == 1: #inst
+ if len (oldInst) == 0:
+ newInst = list
+ else:
+ newInst = []
+ for idx in range (len (list)):
+ (key, value) = list[idx]
+ if key.find ("High") == len (key) - 4:
+ if oldInst[idx][1] > value:
+ value = oldInst[idx][1]
+ if key.find ("Low") == len (key) - 3:
+ if oldInst[idx][1] < value:
+ value = oldInst[idx][1]
+ newInst.append ((key, value))
+ self.tables[className][id] = (timestamps, oldConf, newInst)
+
+ finally:
+ self.lock.release ()
+
+ def methodReply (self, broker, methodId, status, sText, args):
+ """ Callback for method-reply messages """
+ pass
+
+ def schemaHandler (self, context, className, configs, insts, methods, events):
+ """ Callback for schema updates """
+ if className not in self.schema:
+ self.schema[className] = (configs, insts, methods, events)
+
+ def __init__ (self, disp, host, port=5672, username="guest", password="guest",
+ spec="../../specs/amqp.0-10-preview.xml"):
+ self.broker = ManagedBroker (host, port, username, password, spec)
+ self.broker.configListener (0, self.dataHandler)
+ self.broker.instrumentationListener (1, self.dataHandler)
+ self.broker.methodListener (None, self.methodReply)
+ self.broker.schemaListener (None, self.schemaHandler)
+ self.lock = Lock ()
+ self.tables = {}
+ self.schema = {}
+ self.baseId = 0
+ self.disp = disp
+ self.broker.start ()
+
+ def close (self):
+ self.broker.stop ()
+
+ def getObjIndex (self, className, config):
+ """ Concatenate the values from index columns to form a unique object name """
+ result = ""
+ schemaConfig = self.schema[className][0]
+ for item in schemaConfig:
+ if item[5] == 1 and item[0] != "id":
+ if result != "":
+ result = result + "."
+ for key,val in config:
+ if key == item[0]:
+ if key.find ("Ref") != -1:
+ val = val - self.baseId
+ result = result + str (val)
+ return result
+
+ def classCompletions (self, prefix):
+ """ Provide a list of candidate class names for command completion """
+ self.lock.acquire ()
+ complist = []
+ try:
+ for name in self.tables:
+ if name.find (prefix) == 0:
+ complist.append (name)
+ finally:
+ self.lock.release ()
+ return complist
+
+ def typeName (self, typecode):
+ """ Convert type-codes to printable strings """
+ if typecode == 1:
+ return "uint8"
+ elif typecode == 2:
+ return "uint16"
+ elif typecode == 3:
+ return "uint32"
+ elif typecode == 4:
+ return "uint64"
+ elif typecode == 5:
+ return "bool"
+ elif typecode == 6:
+ return "short-string"
+ elif typecode == 7:
+ return "long-string"
+ else:
+ raise ValueError ("Invalid type code: %d" % typecode)
+
+ def accessName (self, code):
+ """ Convert element access codes to printable strings """
+ if code == 1:
+ return "ReadCreate"
+ elif code == 2:
+ return "ReadWrite"
+ elif code == 3:
+ return "ReadOnly"
+ else:
+ raise ValueErrir ("Invalid access code: %d" %code)
+
+ def notNone (self, text):
+ if text == None:
+ return ""
+ else:
+ return text
+
+ def listOfIds (self, className, tokens):
+ """ Generate a tuple of object ids for a classname based on command tokens. """
+ list = []
+ if tokens[0] == "all":
+ for id in self.tables[className]:
+ list.append (id - self.baseId)
+
+ else:
+ for token in tokens:
+ if token.find ("-") != -1:
+ ids = token.split("-", 2)
+ for id in range (int (ids[0]), int (ids[1]) + 1):
+ if self.getClassForId (long (id) + self.baseId) == className:
+ list.append (id)
+ else:
+ list.append (token)
+
+ list.sort ()
+ result = ()
+ for item in list:
+ result = result + (item,)
+ return result
+
+ def listClasses (self):
+ """ Generate a display of the list of classes """
+ self.lock.acquire ()
+ try:
+ rows = []
+ sorted = self.tables.keys ()
+ sorted.sort ()
+ for name in sorted:
+ active = 0
+ deleted = 0
+ for record in self.tables[name]:
+ isdel = False
+ ts = self.tables[name][record][0]
+ if ts[2] > 0:
+ isdel = True
+ if isdel:
+ deleted = deleted + 1
+ else:
+ active = active + 1
+ rows.append ((name, active, deleted))
+ self.disp.table ("Management Object Types:",
+ ("ObjectType", "Active", "Deleted"), rows)
+ finally:
+ self.lock.release ()
+
+ def listObjects (self, className):
+ """ Generate a display of a list of objects in a class """
+ self.lock.acquire ()
+ try:
+ if className not in self.tables:
+ print ("Object type %s not known" % className)
+ else:
+ rows = []
+ sorted = self.tables[className].keys ()
+ sorted.sort ()
+ for objId in sorted:
+ (ts, config, inst) = self.tables[className][objId]
+ createTime = self.disp.timestamp (ts[1])
+ destroyTime = "-"
+ if ts[2] > 0:
+ destroyTime = self.disp.timestamp (ts[2])
+ objIndex = self.getObjIndex (className, config)
+ row = (objId - self.baseId, createTime, destroyTime, objIndex)
+ rows.append (row)
+ self.disp.table ("Objects of type %s" % className,
+ ("ID", "Created", "Destroyed", "Index"),
+ rows)
+ finally:
+ self.lock.release ()
+
+ def showObjects (self, tokens):
+ """ Generate a display of object data for a particular class """
+ self.lock.acquire ()
+ try:
+ className = tokens[0]
+ if className not in self.tables:
+ print "Class not known: %s" % className
+ raise ValueError ()
+
+ userIds = self.listOfIds (className, tokens[1:])
+ if len (userIds) == 0:
+ print "No object IDs supplied"
+ raise ValueError ()
+
+ ids = []
+ for id in userIds:
+ if self.getClassForId (long (id) + self.baseId) == className:
+ ids.append (long (id) + self.baseId)
+
+ rows = []
+ config = self.tables[className][ids[0]][1]
+ for eIdx in range (len (config)):
+ key = config[eIdx][0]
+ if key != "id":
+ isRef = key.find ("Ref") == len (key) - 3
+ row = ("config", key)
+ for id in ids:
+ value = self.tables[className][id][1][eIdx][1]
+ if isRef:
+ value = value - self.baseId
+ row = row + (value,)
+ rows.append (row)
+
+ inst = self.tables[className][ids[0]][2]
+ for eIdx in range (len (inst)):
+ key = inst[eIdx][0]
+ if key != "id":
+ isRef = key.find ("Ref") == len (key) - 3
+ row = ("inst", key)
+ for id in ids:
+ value = self.tables[className][id][2][eIdx][1]
+ if isRef:
+ value = value - self.baseId
+ row = row + (value,)
+ rows.append (row)
+
+ titleRow = ("Type", "Element")
+ for id in ids:
+ titleRow = titleRow + (str (id - self.baseId),)
+ self.disp.table ("Object of type %s:" % className, titleRow, rows)
+
+ except:
+ pass
+ self.lock.release ()
+
+ def schemaSummary (self):
+ """ Generate a display of the list of classes in the schema """
+ self.lock.acquire ()
+ try:
+ rows = []
+ sorted = self.schema.keys ()
+ sorted.sort ()
+ for className in sorted:
+ tuple = self.schema[className]
+ row = (className, len (tuple[0]), len (tuple[1]), len (tuple[2]), len (tuple[3]))
+ rows.append (row)
+ self.disp.table ("Classes in Schema:",
+ ("Class", "ConfigElements", "InstElements", "Methods", "Events"),
+ rows)
+ finally:
+ self.lock.release ()
+
+ def schemaTable (self, className):
+ """ Generate a display of details of the schema of a particular class """
+ self.lock.acquire ()
+ try:
+ if className not in self.schema:
+ print ("Class name %s not known" % className)
+ raise ValueError ()
+
+ rows = []
+ for config in self.schema[className][0]:
+ name = config[0]
+ if name != "id":
+ typename = self.typeName(config[1])
+ unit = self.notNone (config[2])
+ desc = self.notNone (config[3])
+ access = self.accessName (config[4])
+ extra = ""
+ if config[5] == 1:
+ extra = extra + "index "
+ if config[6] != None:
+ extra = extra + "Min: " + str (config[6])
+ if config[7] != None:
+ extra = extra + "Max: " + str (config[7])
+ if config[8] != None:
+ extra = extra + "MaxLen: " + str (config[8])
+ rows.append ((name, typename, unit, access, extra, desc))
+
+ for config in self.schema[className][1]:
+ name = config[0]
+ if name != "id":
+ typename = self.typeName(config[1])
+ unit = self.notNone (config[2])
+ desc = self.notNone (config[3])
+ rows.append ((name, typename, unit, "", "", desc))
+
+ titles = ("Element", "Type", "Unit", "Access", "Notes", "Description")
+ self.disp.table ("Schema for class '%s':" % className, titles, rows)
+
+ for method in self.schema[className][2]:
+ mname = method[0]
+ mdesc = method[1]
+ args = method[2]
+ caption = "\nMethod '%s' %s" % (mname, self.notNone (mdesc))
+ rows = []
+ for arg in args:
+ name = arg[0]
+ typename = self.typeName (arg[1])
+ dir = arg[2]
+ unit = self.notNone (arg[3])
+ desc = self.notNone (arg[4])
+ extra = ""
+ if arg[5] != None:
+ extra = extra + "Min: " + str (arg[5])
+ if arg[6] != None:
+ extra = extra + "Max: " + str (arg[6])
+ if arg[7] != None:
+ extra = extra + "MaxLen: " + str (arg[7])
+ if arg[8] != None:
+ extra = extra + "Default: " + str (arg[8])
+ rows.append ((name, typename, dir, unit, extra, desc))
+ titles = ("Argument", "Type", "Direction", "Unit", "Notes", "Description")
+ self.disp.table (caption, titles, rows)
+
+ except:
+ pass
+ self.lock.release ()
+
+ def getClassForId (self, objId):
+ """ Given an object ID, return the class name for the referenced object """
+ for className in self.tables:
+ if objId in self.tables[className]:
+ return className
+ return None
+
+ def do_list (self, data):
+ tokens = data.split ()
+ if len (tokens) == 0:
+ self.listClasses ()
+ elif len (tokens) == 1:
+ self.listObjects (data)
+ else:
+ self.showObjects (tokens)
+
+ def do_schema (self, data):
+ if data == "":
+ self.schemaSummary ()
+ else:
+ self.schemaTable (data)
+
+ def do_call (self, data):
+ print "Not yet implemented"
diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py
index a5ad997a24..8373ecceb7 100644
--- a/qpid/python/qpid/management.py
+++ b/qpid/python/qpid/management.py
@@ -31,17 +31,74 @@ from qpid.client import Client
from qpid.content import Content
from cStringIO import StringIO
from codec import Codec, EOF
+from threading import Lock
+
+
+class SequenceManager:
+ def __init__ (self):
+ self.lock = Lock ()
+ self.sequence = 0
+ self.pending = {}
+
+ def reserve (self, data):
+ self.lock.acquire ()
+ result = self.sequence
+ self.sequence = self.sequence + 1
+ self.pending[result] = data
+ self.lock.release ()
+ return result
+
+ def release (self, seq):
+ data = None
+ self.lock.acquire ()
+ if seq in self.pending:
+ data = self.pending[seq]
+ del self.pending[seq]
+ self.lock.release ()
+ return data
-#===================================================================
-# ManagementMetadata
-#
-# One instance of this class is created for each ManagedBroker. It
-# is used to store metadata from the broker which is needed for the
-# proper interpretation of recevied management content.
-#
-#===================================================================
class ManagementMetadata:
-
+ """One instance of this class is created for each ManagedBroker. It
+ is used to store metadata from the broker which is needed for the
+ proper interpretation of received management content."""
+
+ def encodeValue (self, codec, value, typecode):
+ if typecode == 1:
+ codec.encode_octet (int (value))
+ elif typecode == 2:
+ codec.encode_short (int (value))
+ elif typecode == 3:
+ codec.encode_long (long (value))
+ elif typecode == 4:
+ codec.encode_longlong (long (value))
+ elif typecode == 5:
+ codec.encode_octet (int (value))
+ elif typecode == 6:
+ codec.encode_shortstr (value)
+ elif typecode == 7:
+ codec.encode_longstr (value)
+ else:
+ raise ValueError ("Invalid type code: %d" % typecode)
+
+ def decodeValue (self, codec, typecode):
+ if typecode == 1:
+ data = codec.decode_octet ()
+ elif typecode == 2:
+ data = codec.decode_short ()
+ elif typecode == 3:
+ data = codec.decode_long ()
+ elif typecode == 4:
+ data = codec.decode_longlong ()
+ elif typecode == 5:
+ data = codec.decode_octet ()
+ elif typecode == 6:
+ data = codec.decode_shortstr ()
+ elif typecode == 7:
+ data = codec.decode_longstr ()
+ else:
+ raise ValueError ("Invalid type code: %d" % typecode)
+ return data
+
def parseSchema (self, cls, codec):
className = codec.decode_shortstr ()
configCount = codec.decode_short ()
@@ -100,12 +157,56 @@ class ManagementMetadata:
inst = (name, type, unit, desc)
insts.append (inst)
- # TODO: Handle notification of schema change outbound
+ for idx in range (methodCount):
+ ft = codec.decode_table ()
+ mname = ft["name"]
+ argCount = ft["argCount"]
+ if "desc" in ft:
+ mdesc = ft["desc"]
+ else:
+ mdesc = None
+
+ args = []
+ for aidx in range (argCount):
+ ft = codec.decode_table ()
+ name = ft["name"]
+ type = ft["type"]
+ dir = ft["dir"].upper ()
+ unit = None
+ min = None
+ max = None
+ maxlen = None
+ desc = None
+ default = None
+
+ for key, value in ft.items ():
+ if key == "unit":
+ unit = value
+ elif key == "min":
+ min = value
+ elif key == "max":
+ max = value
+ elif key == "maxlen":
+ maxlen = value
+ elif key == "desc":
+ desc = value
+ elif key == "default":
+ default = value
+
+ arg = (name, type, dir, unit, desc, min, max, maxlen, default)
+ args.append (arg)
+ methods.append ((mname, mdesc, args))
+
+
self.schema[(className,'C')] = configs
self.schema[(className,'I')] = insts
self.schema[(className,'M')] = methods
self.schema[(className,'E')] = events
+ if self.broker.schema_cb != None:
+ self.broker.schema_cb[1] (self.broker.schema_cb[0], className,
+ configs, insts, methods, events)
+
def parseContent (self, cls, codec):
if cls == 'C' and self.broker.config_cb == None:
return
@@ -127,22 +228,7 @@ class ManagementMetadata:
for element in self.schema[(className,cls)][:]:
tc = element[1]
name = element[0]
- if tc == 1: # TODO: Define constants for these
- data = codec.decode_octet ()
- elif tc == 2:
- data = codec.decode_short ()
- elif tc == 3:
- data = codec.decode_long ()
- elif tc == 4:
- data = codec.decode_longlong ()
- elif tc == 5:
- data = codec.decode_octet ()
- elif tc == 6:
- data = codec.decode_shortstr ()
- elif tc == 7:
- data = codec.decode_longstr ()
- else:
- raise ValueError ("Invalid type code: %d" % tc)
+ data = self.decodeValue (codec, tc)
row.append ((name, data))
if cls == 'C':
@@ -168,14 +254,9 @@ class ManagementMetadata:
self.schema = {}
-#===================================================================
-# ManagedBroker
-#
-# An object of this class represents a connection (over AMQP) to a
-# single managed broker.
-#
-#===================================================================
class ManagedBroker:
+ """An object of this class represents a connection (over AMQP) to a
+ single managed broker."""
mExchange = "qpid.management"
dExchange = "amq.direct"
@@ -205,18 +286,35 @@ class ManagedBroker:
msg.complete ()
def reply_cb (self, msg):
- codec = Codec (StringIO (msg.content.body), self.spec)
- methodId = codec.decode_long ()
+ codec = Codec (StringIO (msg.content.body), self.spec)
+ sequence = codec.decode_long ()
status = codec.decode_long ()
sText = codec.decode_shortstr ()
- args = {}
+ data = self.sequenceManager.release (sequence)
+ if data == None:
+ msg.complete ()
+ return
+
+ (userSequence, className, methodName) = data
+
if status == 0:
- args["sequence"] = codec.decode_long ()
- args["body"] = codec.decode_longstr ()
+ ms = self.metadata.schema[(className,'M')]
+ arglist = None
+ for (mname, mdesc, margs) in ms:
+ if mname == methodName:
+ arglist = margs
+ if arglist == None:
+ msg.complete ()
+ return
+
+ args = {}
+ for arg in arglist:
+ if arg[2].find("O") != -1:
+ args[arg[0]] = self.metadata.decodeValue (codec, arg[1])
if self.method_cb != None:
- self.method_cb[1] (self.method_cb[0], methodId, status, sText, args)
+ self.method_cb[1] (self.method_cb[0], userSequence, status, sText, args)
msg.complete ()
@@ -225,17 +323,18 @@ class ManagedBroker:
port = 5672,
username = "guest",
password = "guest",
- specfile = "../specs/amqp.0-10-preview.xml"):
-
- self.spec = qpid.spec.load (specfile)
- self.client = None
- self.channel = None
- self.queue = None
- self.rqueue = None
- self.qname = None
- self.rqname = None
- self.metadata = ManagementMetadata (self)
- self.connected = 0
+ specfile = "/usr/share/amqp/amqp.0-10-preview.xml"):
+
+ self.spec = qpid.spec.load (specfile)
+ self.client = None
+ self.channel = None
+ self.queue = None
+ self.rqueue = None
+ self.qname = None
+ self.rqname = None
+ self.metadata = ManagementMetadata (self)
+ self.sequenceManager = SequenceManager ()
+ self.connected = 0
self.lastConnectError = None
# Initialize the callback records
@@ -265,17 +364,37 @@ class ManagedBroker:
def instrumentationListener (self, context, callback):
self.inst_cb = (context, callback)
- def method (self, methodId, objId, className,
+ def method (self, userSequence, objId, className,
methodName, args=None, packageName="qpid"):
codec = Codec (StringIO (), self.spec);
- codec.encode_long (methodId)
- codec.encode_longlong (objId)
- codec.encode_shortstr (self.rqname)
-
- # TODO: Encode args according to schema
- if methodName == "echo":
- codec.encode_long (args["sequence"])
- codec.encode_longstr (args["body"])
+ sequence = self.sequenceManager.reserve ((userSequence, className, methodName))
+ codec.encode_long (sequence) # Method sequence id
+ codec.encode_longlong (objId) # ID of object
+ codec.encode_shortstr (self.rqname) # name of reply queue
+
+ # Encode args according to schema
+ if (className,'M') not in self.metadata.schema:
+ self.sequenceManager.release (sequence)
+ raise ValueError ("Unknown class name: %s" % className)
+
+ ms = self.metadata.schema[(className,'M')]
+ arglist = None
+ for (mname, mdesc, margs) in ms:
+ if mname == methodName:
+ arglist = margs
+ if arglist == None:
+ self.sequenceManager.release (sequence)
+ raise ValueError ("Unknown method name: %s" % methodName)
+
+ for arg in arglist:
+ if arg[2].find("I") != -1:
+ value = arg[8] # default
+ if arg[0] in args:
+ value = args[arg[0]]
+ if value == None:
+ self.sequenceManager.release (sequence)
+ raise ValueError ("Missing non-defaulted argument: %s" % arg[0])
+ self.metadata.encodeValue (codec, value, arg[1])
msg = Content (codec.stream.getvalue ())
msg["content_type"] = "application/octet-stream"
@@ -325,7 +444,7 @@ class ManagedBroker:
self.connected = 1
except socket.error, e:
- print "Socket Error Detected:", e[1]
+ print "Socket Error:", e[1]
self.lastConnectError = e
raise
except:
diff --git a/qpid/python/qpid/management.py.rej b/qpid/python/qpid/management.py.rej
new file mode 100644
index 0000000000..28d172abe5
--- /dev/null
+++ b/qpid/python/qpid/management.py.rej
@@ -0,0 +1,457 @@
+***************
+*** 18,24 ****
+ #
+
+ """
+- Management classes for AMQP
+ """
+
+ import qpid
+--- 18,24 ----
+ #
+
+ """
++ Management API for Qpid
+ """
+
+ import qpid
+***************
+*** 42,91 ****
+ #===================================================================
+ class ManagementMetadata:
+
+- def parseSchema (self, cls, oid, len, codec):
+- #print "Schema Record: objId=", oid
+
+- config = []
+- inst = []
+- while 1:
+- flags = codec.decode_octet ()
+- if flags == 0x80:
+- break
+
+- tc = codec.decode_octet ()
+- name = codec.decode_shortstr ()
+- desc = codec.decode_shortstr ()
+
+- if flags & 1: # TODO: Define constants for these
+- config.append ((tc, name, desc))
+- if (flags & 1) == 0 or (flags & 2) == 2:
+- inst.append ((tc, name, desc))
+
+ # TODO: Handle notification of schema change outbound
+- self.schema[(oid,'C')] = config
+- self.schema[(oid,'I')] = inst
+
+- def parseContent (self, cls, oid, len, codec):
+- #print "Content Record: Class=", cls, ", objId=", oid
+-
+ if cls == 'C' and self.broker.config_cb == None:
+ return
+ if cls == 'I' and self.broker.inst_cb == None:
+ return
+
+- if (oid,cls) not in self.schema:
+ return
+
+ row = []
+ timestamps = []
+
+- timestamps.append (codec.decode_longlong ()); # Current Time
+- timestamps.append (codec.decode_longlong ()); # Create Time
+- timestamps.append (codec.decode_longlong ()); # Delete Time
+
+- for element in self.schema[(oid,cls)][:]:
+- tc = element[0]
+- name = element[1]
+ if tc == 1: # TODO: Define constants for these
+ data = codec.decode_octet ()
+ elif tc == 2:
+--- 42,132 ----
+ #===================================================================
+ class ManagementMetadata:
+
++ def parseSchema (self, cls, codec):
++ className = codec.decode_shortstr ()
++ configCount = codec.decode_short ()
++ instCount = codec.decode_short ()
++ methodCount = codec.decode_short ()
++ eventCount = codec.decode_short ()
+
++ configs = []
++ insts = []
++ methods = []
++ events = []
+
++ configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None))
++ insts.append (("id", 4, None, None))
+
++ for idx in range (configCount):
++ ft = codec.decode_table ()
++ name = ft["name"]
++ type = ft["type"]
++ access = ft["access"]
++ index = ft["index"]
++ unit = None
++ min = None
++ max = None
++ maxlen = None
++ desc = None
+
++ for key, value in ft.items ():
++ if key == "unit":
++ unit = value
++ elif key == "min":
++ min = value
++ elif key == "max":
++ max = value
++ elif key == "maxlen":
++ maxlen = value
++ elif key == "desc":
++ desc = value
++
++ config = (name, type, unit, desc, access, index, min, max, maxlen)
++ configs.append (config)
++
++ for idx in range (instCount):
++ ft = codec.decode_table ()
++ name = ft["name"]
++ type = ft["type"]
++ unit = None
++ desc = None
++
++ for key, value in ft.items ():
++ if key == "unit":
++ unit = value
++ elif key == "desc":
++ desc = value
++
++ inst = (name, type, unit, desc)
++ insts.append (inst)
++
+ # TODO: Handle notification of schema change outbound
++ self.schema[(className,'C')] = configs
++ self.schema[(className,'I')] = insts
++ self.schema[(className,'M')] = methods
++ self.schema[(className,'E')] = events
+
++ def parseContent (self, cls, codec):
+ if cls == 'C' and self.broker.config_cb == None:
+ return
+ if cls == 'I' and self.broker.inst_cb == None:
+ return
+
++ className = codec.decode_shortstr ()
++
++ if (className,cls) not in self.schema:
+ return
+
+ row = []
+ timestamps = []
+
++ timestamps.append (codec.decode_longlong ()) # Current Time
++ timestamps.append (codec.decode_longlong ()) # Create Time
++ timestamps.append (codec.decode_longlong ()) # Delete Time
+
++ for element in self.schema[(className,cls)][:]:
++ tc = element[1]
++ name = element[0]
+ if tc == 1: # TODO: Define constants for these
+ data = codec.decode_octet ()
+ elif tc == 2:
+***************
+*** 98,130 ****
+ data = codec.decode_octet ()
+ elif tc == 6:
+ data = codec.decode_shortstr ()
+ row.append ((name, data))
+
+ if cls == 'C':
+- self.broker.config_cb[1] (self.broker.config_cb[0], oid, row, timestamps)
+- if cls == 'I':
+- self.broker.inst_cb[1] (self.broker.inst_cb[0], oid, row, timestamps)
+
+ def parse (self, codec):
+- try:
+- opcode = chr (codec.decode_octet ())
+- except EOF:
+- return 0
+
+- cls = chr (codec.decode_octet ())
+- oid = codec.decode_short ()
+- len = codec.decode_long ()
+-
+- if len < 8:
+- raise ValueError ("parse error: value of length field too small")
+-
+ if opcode == 'S':
+- self.parseSchema (cls, oid, len, codec)
+
+- if opcode == 'C':
+- self.parseContent (cls, oid, len, codec)
+
+- return 1
+
+ def __init__ (self, broker):
+ self.broker = broker
+--- 139,167 ----
+ data = codec.decode_octet ()
+ elif tc == 6:
+ data = codec.decode_shortstr ()
++ elif tc == 7:
++ data = codec.decode_longstr ()
++ else:
++ raise ValueError ("Invalid type code: %d" % tc)
+ row.append ((name, data))
+
+ if cls == 'C':
++ self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps)
++ elif cls == 'I':
++ self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps)
+
+ def parse (self, codec):
++ opcode = chr (codec.decode_octet ())
++ cls = chr (codec.decode_octet ())
+
+ if opcode == 'S':
++ self.parseSchema (cls, codec)
+
++ elif opcode == 'C':
++ self.parseContent (cls, codec)
+
++ else:
++ raise ValueError ("Unknown opcode: %c" % opcode);
+
+ def __init__ (self, broker):
+ self.broker = broker
+***************
+*** 140,146 ****
+ #===================================================================
+ class ManagedBroker:
+
+- exchange = "qpid.management"
+
+ def checkHeader (self, codec):
+ octet = chr (codec.decode_octet ())
+--- 177,184 ----
+ #===================================================================
+ class ManagedBroker:
+
++ mExchange = "qpid.management"
++ dExchange = "amq.direct"
+
+ def checkHeader (self, codec):
+ octet = chr (codec.decode_octet ())
+***************
+*** 157,225 ****
+ return 0
+ return 1
+
+- def receive_cb (self, msg):
+ codec = Codec (StringIO (msg.content.body), self.spec)
+
+ if self.checkHeader (codec) == 0:
+ raise ValueError ("outer header invalid");
+
+- while self.metadata.parse (codec):
+- pass
+
+ msg.complete ()
+
+- def __init__ (self, host = "localhost", port = 5672,
+- username = "guest", password = "guest"):
+
+- self.spec = qpid.spec.load ("../specs/amqp.0-10-preview.xml")
+- self.client = None
+- self.channel = None
+- self.queue = None
+- self.qname = None
+- self.metadata = ManagementMetadata (self)
+
+ # Initialize the callback records
+ self.schema_cb = None
+ self.config_cb = None
+ self.inst_cb = None
+
+ self.host = host
+ self.port = port
+ self.username = username
+ self.password = password
+
+ def schemaListener (self, context, callback):
+ self.schema_cb = (context, callback)
+
+ def configListener (self, context, callback):
+ self.config_cb = (context, callback)
+
+ def instrumentationListener (self, context, callback):
+ self.inst_cb = (context, callback)
+
+ def start (self):
+- print "Connecting to broker", self.host
+
+ try:
+ self.client = Client (self.host, self.port, self.spec)
+ self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
+ self.channel = self.client.channel (1)
+- response = self.channel.session_open (detached_lifetime=300)
+- self.qname = "mgmt-" + base64.urlsafe_b64encode(response.session_id)
+
+- self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1)
+- self.channel.queue_bind (exchange=ManagedBroker.exchange, queue=self.qname,
+- routing_key="mgmt")
+- self.channel.message_subscribe (queue=self.qname, destination="dest")
+- self.queue = self.client.queue ("dest")
+- self.queue.listen (self.receive_cb)
+
+- self.channel.message_flow_mode (destination="dest", mode=1)
+- self.channel.message_flow (destination="dest", unit=0, value=0xFFFFFFFF)
+- self.channel.message_flow (destination="dest", unit=1, value=0xFFFFFFFF)
+
+ except socket.error, e:
+ print "Socket Error Detected:", e[1]
+ raise
+ except:
+ raise
+--- 195,335 ----
+ return 0
+ return 1
+
++ def publish_cb (self, msg):
+ codec = Codec (StringIO (msg.content.body), self.spec)
+
+ if self.checkHeader (codec) == 0:
+ raise ValueError ("outer header invalid");
+
++ self.metadata.parse (codec)
++ msg.complete ()
+
++ def reply_cb (self, msg):
++ codec = Codec (StringIO (msg.content.body), self.spec)
++ methodId = codec.decode_long ()
++ status = codec.decode_long ()
++ sText = codec.decode_shortstr ()
++
++ args = {}
++ if status == 0:
++ args["sequence"] = codec.decode_long ()
++ args["body"] = codec.decode_longstr ()
++
++ if self.method_cb != None:
++ self.method_cb[1] (self.method_cb[0], methodId, status, sText, args)
++
+ msg.complete ()
+
++ def __init__ (self,
++ host = "localhost",
++ port = 5672,
++ username = "guest",
++ password = "guest",
++ specfile = "../specs/amqp.0-10-preview.xml"):
+
++ self.spec = qpid.spec.load (specfile)
++ self.client = None
++ self.channel = None
++ self.queue = None
++ self.rqueue = None
++ self.qname = None
++ self.rqname = None
++ self.metadata = ManagementMetadata (self)
++ self.connected = 0
++ self.lastConnectError = None
+
+ # Initialize the callback records
++ self.status_cb = None
+ self.schema_cb = None
+ self.config_cb = None
+ self.inst_cb = None
++ self.method_cb = None
+
+ self.host = host
+ self.port = port
+ self.username = username
+ self.password = password
+
++ def statusListener (self, context, callback):
++ self.status_cb = (context, callback)
++
+ def schemaListener (self, context, callback):
+ self.schema_cb = (context, callback)
+
+ def configListener (self, context, callback):
+ self.config_cb = (context, callback)
+
++ def methodListener (self, context, callback):
++ self.method_cb = (context, callback)
++
+ def instrumentationListener (self, context, callback):
+ self.inst_cb = (context, callback)
+
++ def method (self, methodId, objId, className,
++ methodName, args=None, packageName="qpid"):
++ codec = Codec (StringIO (), self.spec);
++ codec.encode_long (methodId)
++ codec.encode_longlong (objId)
++ codec.encode_shortstr (self.rqname)
++
++ # TODO: Encode args according to schema
++ if methodName == "echo":
++ codec.encode_long (args["sequence"])
++ codec.encode_longstr (args["body"])
++
++ msg = Content (codec.stream.getvalue ())
++ msg["content_type"] = "application/octet-stream"
++ msg["routing_key"] = "method." + packageName + "." + className + "." + methodName
++ msg["reply_to"] = self.spec.struct ("reply_to")
++ self.channel.message_transfer (destination="qpid.management", content=msg)
++
++ def isConnected (self):
++ return connected
++
+ def start (self):
++ print "Connecting to broker %s:%d" % (self.host, self.port)
+
+ try:
+ self.client = Client (self.host, self.port, self.spec)
+ self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
+ self.channel = self.client.channel (1)
++ response = self.channel.session_open (detached_lifetime=10)
++ self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id)
++ self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id)
+
++ self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1)
++ self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1)
++
++ self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname,
++ routing_key="mgmt.#")
++ self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname,
++ routing_key=self.rqname)
+
++ self.channel.message_subscribe (queue=self.qname, destination="mdest")
++ self.channel.message_subscribe (queue=self.rqname, destination="rdest")
+
++ self.queue = self.client.queue ("mdest")
++ self.queue.listen (self.publish_cb)
++
++ self.channel.message_flow_mode (destination="mdest", mode=1)
++ self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF)
++ self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF)
++
++ self.rqueue = self.client.queue ("rdest")
++ self.rqueue.listen (self.reply_cb)
++
++ self.channel.message_flow_mode (destination="rdest", mode=1)
++ self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
++ self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
++
++ self.connected = 1
++
+ except socket.error, e:
+ print "Socket Error Detected:", e[1]
++ self.lastConnectError = e
+ raise
+ except:
+ raise
++
++ def stop (self):
++ pass
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index 3105c5e055..2147122f0a 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -109,27 +109,6 @@
<configElement name="durable" type="bool" access="RC"/>
<configElement name="autoDelete" type="bool" access="RC"/>
<configElement name="exclusive" type="bool" access="RC"/>
- <configElement name="pageMemoryLimit" type="uint32" access="RO"/>
-
- <!-- Persistent Journal Support -->
- <instElement name="journalLocation" type="sstr" desc="Logical directory on disk"/>
- <instElement name="journalBaseFileName" type="sstr" desc="Base filename prefix for journal"/>
- <instElement name="journalInitialFileCount" type="uint32" desc="Number of files initially allocated to this journal"/>
- <instElement name="journalCurrentFileCount" type="uint32" desc="Number of files currently allocated to this journal"/>
- <instElement name="journalDataFileSize" type="uint32" unit="byte" desc="Size of each journal data file"/>
- <instElement name="journalFreeFileCount" type="hilo32" desc="Number of files free on this journal. Includes free files trapped in holes."/>
- <instElement name="journalAvailableFileCount" type="hilo32" desc="Number of files available to be written. Excluding holes"/>
- <instElement name="journalRecordDepth" type="hilo32" unit="record" desc="Number of enqueued records (durable messages)"/>
- <instElement name="journalRecordEnqueues" type="count64" unit="record" desc="Total enqueued records on journal"/>
- <instElement name="journalRecordDequeues" type="count64" unit="record" desc="Total dequeued records on journal"/>
- <instElement name="journalWriteWaitFailures" type="count64" unit="record" desc="AIO Wait failures on write"/>
- <instElement name="journalWriteBusyFailures" type="count64" unit="record" desc="AIO Busy failures on write"/>
- <instElement name="journalReadRecordCount" type="count64" unit="record" desc="Records read from the journal"/>
- <instElement name="journalReadBusyFailures" type="count64" unit="record" desc="AIO Busy failures on read"/>
- <instElement name="journalWritePageCacheDepth" type="hilo32" unit="page" desc="Current depth of write-page-cache"/>
- <instElement name="journalWritePageSize" type="uint32" unit="byte" desc="Page size in write-page-cache"/>
- <instElement name="journalReadPageCacheDepth" type="hilo32" unit="page" desc="Current depth of read-page-cache"/>
- <instElement name="journalReadPageSize" type="uint32" unit="byte" desc="Page size in read-page-cache"/>
<instElement name="msgTotalEnqueues" type="count64" unit="message" desc="Total messages enqueued"/>
<instElement name="msgTotalDequeues" type="count64" unit="message" desc="Total messages dequeued"/>