diff options
authorTed Ross <>2010-02-05 22:47:56 +0000
committerTed Ross <>2010-02-05 22:47:56 +0000
commit2b86496e7e980834464c35f49cbf7337815aeb4c (patch)
parent21fbb39e510e668f8dbcf221a8d4ba8c3a84f6b7 (diff)
QPID-2029 - Added verification script contributed by John Dunning
git-svn-id: 13f79535-47bb-0310-9956-ffa450edef68
1 files changed, 400 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/verify_cluster_objects b/qpid/cpp/src/tests/verify_cluster_objects
new file mode 100644
index 0000000000..cea875662f
--- /dev/null
+++ b/qpid/cpp/src/tests/verify_cluster_objects
@@ -0,0 +1,400 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+import getopt
+import sys
+import locale
+import socket
+import re
+from qmf.console import Session, SchemaClass
+_host = "localhost"
+_connTimeout = 10
+_verbose = 0
+_del_test = False;
+pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
+_debug_recursion = 0
+def Usage ():
+ print "Usage: verify_cluster_objects [OPTIONS] [broker-addr]"
+ print
+ print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
+ print " ex: localhost,, broker-host:10000, guest/guest@localhost"
+ print
+ print " This program contacts every node of a cluster, loads all manageable objects from"
+ print " those nodes and verifies that the management data is identical across the clusters."
+ print
+ print "Options:"
+ print " --timeout seconds (10) Maximum time to wait for broker connection"
+ print " --verbose level (0) Show details of objects and their IDs"
+ print " --delete Delete some objects after creation, to test synchup"
+ print
+ sys.exit (1)
+class IpAddr:
+ def __init__(self, text):
+ if text.find("@") != -1:
+ tokens = text.split("@")
+ text = tokens[1]
+ if text.find(":") != -1:
+ tokens = text.split(":")
+ text = tokens[0]
+ self.port = int(tokens[1])
+ else:
+ self.port = 5672
+ self.dottedQuad = socket.gethostbyname(text)
+ nums = self.dottedQuad.split(".")
+ self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
+ def bestAddr(self, addrPortList):
+ bestDiff = 0xFFFFFFFFL
+ bestAddr = None
+ for addrPort in addrPortList:
+ diff = IpAddr(addrPort[0]).addr ^ self.addr
+ if diff < bestDiff:
+ bestDiff = diff
+ bestAddr = addrPort
+ return bestAddr
+class Broker(object):
+ def __init__(self, qmf, broker):
+ = broker
+ self.qmf = qmf
+ agents = qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == 0:
+ self.brokerAgent = a
+ bobj = qmf.getObjects(_class="broker", _package="",
+ _agent=self.brokerAgent)[0]
+ self.currentTime = bobj.getTimestamps()[0]
+ try:
+ self.uptime = bobj.uptime
+ except:
+ self.uptime = 0
+ self.tablesByName = {}
+ self.package = ""
+ def getUrl(self):
+ return
+ def getData(self):
+ if _verbose > 1:
+ print "Broker:",
+ classList = self.qmf.getClasses(self.package)
+ for cls in classList:
+ if self.qmf.getSchema(cls).kind == SchemaClass.CLASS_KIND_TABLE:
+ self.loadTable(cls)
+ #
+ # this should be a method on an object, but is kept here for now, until
+ # we finish sorting out the treatment of names in qmfv2
+ #
+ def getAbstractId(self, object):
+ """ return a string the of the hierarchical name """
+ global _debug_recursion
+ result = u""
+ valstr = u""
+ _debug_recursion += 1
+ debug_prefix = _debug_recursion
+ if (_verbose > 9):
+ print debug_prefix, " enter gai: props ", self._properties
+ for property, value in object._properties:
+ # we want to recurse on things which are refs. we tell by
+ # asking each property if it's an index. I think...
+ if (_verbose > 9):
+ print debug_prefix, " prop ", property, " val " , value, " idx ",
+ property.index, " type ", property.type
+ # property is an instance, you can ask its type, name, etc.
+ # special case system refs, as they will never be the same on
+ # distinct cluster nodes. later we probably want a different
+ # way of representing these objects, like for instance don't
+ # include the system ref in the hierarchy.
+ if == "systemRef":
+ _debug_recursion -= 1
+ return ""
+ if property.index:
+ if result != u"":
+ result += u":"
+ if property.type == 10:
+ try:
+ recursive_objects = object._session.getObjects(_objectId = value, _broker=object._broker)
+ if (_verbose > 9):
+ print debug_prefix, " r ", recursive_objects[0]
+ for rp, rv in recursive_objects[0]._properties:
+ print debug_prefix, " rrr ", rp, " idx-p ", rp.index, " v ", rv
+ print debug_prefix, " recursing on ", recursive_objects[0]
+ valstr = self.getAbstractId(recursive_objects[0])
+ if (_verbose > 9):
+ print debug_prefix, " recursing on ", recursive_objects[0],
+ " -> ", valstr
+ except Exception, e:
+ if (_verbose > 9):
+ print debug_prefix, " except ", e
+ valstr = u"<undecodable>"
+ else:
+ # this yields UUID-blah. not good. try something else
+ # valstr = value.__repr__()
+ # print debug_prefix, " val ", value
+ # yetch. this needs to be abstracted someplace? I don't
+ # think we have the infrastructure we need to make these id
+ # strings be sensible in the general case
+ if == "systemId":
+ # special case. try to do something sensible about systemref objects
+ valstr = object.nodeName
+ else:
+ valstr = value.__repr__() # I think...
+ result += valstr
+ if (_verbose > 9):
+ print debug_prefix, " id ", self, " -> ", result
+ _debug_recursion -= 1
+ return result
+ def loadTable(self, cls):
+ if _verbose > 1:
+ print " Class:", cls.getClassName()
+ list = self.qmf.getObjects(_class=cls.getClassName(),
+ _package=cls.getPackageName(),
+ _agent=self.brokerAgent)
+ # tables-by-name maps class name to a table by object-name of
+ # objects. ie use the class name ("broker", "queue", etc) to
+ # index tables-by-name, returning a second table, use the
+ # object name to index that to get an object.
+ self.tablesByName[cls.getClassName()] = {}
+ for obj in list:
+ # make sure we aren't colliding on name. it's an internal
+ # error (ie, the name-generation code is busted) if we do
+ key = self.getAbstractId(obj)
+ if key in self.tablesByName[cls.getClassName()]:
+ print "internal error: collision for %s on key %s\n" % (obj, key)
+ sys.exit(1)
+ self.tablesByName[cls.getClassName()][self.getAbstractId(obj)] = obj
+# sys.exit(1)
+ if _verbose > 1:
+ print " ", obj.getObjectId(), " ", obj.getIndex(), " ", self.getAbstractId(obj)
+class BrokerManager:
+ def __init__(self):
+ self.brokerName = None
+ self.qmf = None
+ = None
+ self.brokers = []
+ self.cluster = None
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = Session()
+ = self.qmf.addBroker(brokerUrl, _connTimeout)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == 0:
+ self.brokerAgent = a
+ def Disconnect(self):
+ if
+ self.qmf.delBroker(
+ def _getCluster(self):
+ packages = self.qmf.getPackages()
+ if "org.apache.qpid.cluster" not in packages:
+ return None
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ print "Clustering is installed but not enabled on the broker."
+ return None
+ self.cluster = clusters[0]
+ def _getHostList(self, urlList):
+ hosts = []
+ hostAddr = IpAddr(_host)
+ for url in urlList:
+ if url.find("amqp:") != 0:
+ raise Exception("Invalid URL 1")
+ url = url[5:]
+ addrs = str(url).split(",")
+ addrList = []
+ for addr in addrs:
+ tokens = addr.split(":")
+ if len(tokens) != 3:
+ raise Exception("Invalid URL 2")
+ addrList.append((tokens[1], tokens[2]))
+ # Find the address in the list that is most likely to be in the same subnet as the address
+ # with which we made the original QMF connection. This increases the probability that we will
+ # be able to reach the cluster member.
+ best = hostAddr.bestAddr(addrList)
+ bestUrl = best[0] + ":" + best[1]
+ hosts.append(bestUrl)
+ return hosts
+ # the main fun which tests for broker state "identity". now that
+ # we're using qmf2 style object names across the board, that test
+ # means that we are ensuring that for all objects of a given
+ # class, an object of that class with the same object name exists
+ # on the peer broker.
+ def verify(self):
+ if _verbose > 0:
+ print "Connecting to the cluster..."
+ self._getCluster()
+ if self.cluster:
+ memberList = self.cluster.members.split(";")
+ hostList = self._getHostList(memberList)
+ self.qmf.delBroker(
+ = None
+ for host in hostList:
+ b = self.qmf.addBroker(host, _connTimeout)
+ self.brokers.append(Broker(self.qmf, b))
+ if _verbose > 0:
+ print " ", b
+ else:
+ print "Failed - Not a cluster"
+ sys.exit(1)
+ failures = []
+ # Wait until connections to all nodes are established before
+ # loading the management data. This will ensure that the
+ # objects are all stable and the same.
+ if _verbose > 0:
+ print "Loading management data from nodes..."
+ for broker in self.brokers:
+ broker.getData()
+ # If we're testing delete-some-objects functionality, create a
+ # few widgets here and then delete them.
+ if _del_test:
+ if _verbose > 0:
+ print "Running delete test"
+ # just stick 'em in the first broker
+ b = self.brokers[0]
+ session = b.qmf.brokers[0].getAmqpSession()
+ session.queue_declare(queue="foo", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="",
+ queue="foo", binding_key="foo")
+ session.queue_declare(queue="bar", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="",
+ queue="bar", binding_key="bar")
+ # now delete 'em
+ session.exchange_unbind(queue="foo", exchange="", binding_key="foo")
+ session.exchange_unbind(queue="bar", exchange="", binding_key="bar")
+ session.queue_delete("bar")
+ session.queue_delete("foo")
+ # Verify that each node has the same set of objects (based on
+ # object name).
+ if _verbose > 0:
+ print "Verifying objects based on object name..."
+ base = self.brokers[0]
+ for broker in self.brokers[1:]:
+ # walk over the class names, for each class (with some
+ # exceptions) walk over the objects of that class, making
+ # sure they match between broker A and broker B
+ for className in base.tablesByName:
+ if className in ["broker", "system", "connection"]:
+ continue
+ tab1 = base.tablesByName[className]
+ tab2 = broker.tablesByName[className]
+ for key in tab1:
+ if key not in tab2:
+ failures.append("%s key %s not found on node %s" %
+ (className, key, broker.getUrl()))
+ for key in tab2:
+ if key not in tab1:
+ failures.append("%s key %s not found on node %s" %
+ (className, key, base.getUrl()))
+ if len(failures) > 0:
+ print "Failures:"
+ for failure in failures:
+ print " %s" % failure
+ sys.exit(1)
+ if _verbose > 0:
+ print "Success"
+ sys.exit(0)
+## Main Program
+ longOpts = ("verbose=", "timeout=", "delete")
+ (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "", longOpts)
+ Usage()
+ encoding = locale.getpreferredencoding()
+ cargs = [a.decode(encoding) for a in encArgs]
+ cargs = encArgs
+for opt in optlist:
+ if opt[0] == "--timeout":
+ _connTimeout = int(opt[1])
+ if _connTimeout == 0:
+ _connTimeout = None
+ elif opt[0] == "--verbose":
+ _verbose = int(opt[1])
+ elif opt[0] == "--delete":
+ _del_test = True;
+ else:
+ Usage()
+nargs = len(cargs)
+bm = BrokerManager()
+if nargs == 1:
+ _host = cargs[0]
+ bm.SetBroker(_host)
+ bm.verify()
+except KeyboardInterrupt:
+ print
+except Exception,e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ sys.exit(1)