diff options
author | Ted Ross <tross@apache.org> | 2010-02-05 22:47:56 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-02-05 22:47:56 +0000 |
commit | 2b86496e7e980834464c35f49cbf7337815aeb4c (patch) | |
tree | a2e914cd6f6e7d30f9a4bf9cfc8b73fa58a03be6 | |
parent | 21fbb39e510e668f8dbcf221a8d4ba8c3a84f6b7 (diff) | |
download | qpid-python-2b86496e7e980834464c35f49cbf7337815aeb4c.tar.gz |
QPID-2029 - Added verification script contributed by John Dunning
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@907119 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/tests/verify_cluster_objects | 400 |
1 files changed, 400 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/verify_cluster_objects b/qpid/cpp/src/tests/verify_cluster_objects new file mode 100644 index 0000000000..cea875662f --- /dev/null +++ b/qpid/cpp/src/tests/verify_cluster_objects @@ -0,0 +1,400 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import locale +import socket +import re +from qmf.console import Session, SchemaClass + +_host = "localhost" +_connTimeout = 10 +_verbose = 0 +_del_test = False; +pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") +_debug_recursion = 0 + +def Usage (): + print "Usage: verify_cluster_objects [OPTIONS] [broker-addr]" + print + print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" + print + print " This program contacts every node of a cluster, loads all manageable objects from" + print " those nodes and verifies that the management data is identical across the clusters." + print + print "Options:" + print " --timeout seconds (10) Maximum time to wait for broker connection" + print " --verbose level (0) Show details of objects and their IDs" + print " --delete Delete some objects after creation, to test synchup" + print + sys.exit (1) + +class IpAddr: + def __init__(self, text): + if text.find("@") != -1: + tokens = text.split("@") + text = tokens[1] + if text.find(":") != -1: + tokens = text.split(":") + text = tokens[0] + self.port = int(tokens[1]) + else: + self.port = 5672 + self.dottedQuad = socket.gethostbyname(text) + nums = self.dottedQuad.split(".") + self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) + + def bestAddr(self, addrPortList): + bestDiff = 0xFFFFFFFFL + bestAddr = None + for addrPort in addrPortList: + diff = IpAddr(addrPort[0]).addr ^ self.addr + if diff < bestDiff: + bestDiff = diff + bestAddr = addrPort + return bestAddr + +class Broker(object): + def __init__(self, qmf, broker): + self.broker = broker + self.qmf = qmf + + agents = qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", + _agent=self.brokerAgent)[0] + self.currentTime = bobj.getTimestamps()[0] + try: + self.uptime = bobj.uptime + except: + self.uptime = 0 + self.tablesByName = {} + self.package = "org.apache.qpid.broker" + + def getUrl(self): + return self.broker.getUrl() + + def getData(self): + if _verbose > 1: + print "Broker:", self.broker + + classList = self.qmf.getClasses(self.package) + for cls in classList: + if self.qmf.getSchema(cls).kind == SchemaClass.CLASS_KIND_TABLE: + self.loadTable(cls) + + + # + # this should be a method on an object, but is kept here for now, until + # we finish sorting out the treatment of names in qmfv2 + # + def getAbstractId(self, object): + """ return a string the of the hierarchical name """ + global _debug_recursion + result = u"" + valstr = u"" + _debug_recursion += 1 + debug_prefix = _debug_recursion + if (_verbose > 9): + print debug_prefix, " enter gai: props ", self._properties + for property, value in object._properties: + + # we want to recurse on things which are refs. we tell by + # asking each property if it's an index. I think... + if (_verbose > 9): + print debug_prefix, " prop ", property, " val " , value, " idx ", + property.index, " type ", property.type + + # property is an instance, you can ask its type, name, etc. + + # special case system refs, as they will never be the same on + # distinct cluster nodes. later we probably want a different + # way of representing these objects, like for instance don't + # include the system ref in the hierarchy. + + if property.name == "systemRef": + _debug_recursion -= 1 + return "" + + if property.index: + if result != u"": + result += u":" + if property.type == 10: + try: + recursive_objects = object._session.getObjects(_objectId = value, _broker=object._broker) + if (_verbose > 9): + print debug_prefix, " r ", recursive_objects[0] + for rp, rv in recursive_objects[0]._properties: + print debug_prefix, " rrr ", rp, " idx-p ", rp.index, " v ", rv + print debug_prefix, " recursing on ", recursive_objects[0] + valstr = self.getAbstractId(recursive_objects[0]) + if (_verbose > 9): + print debug_prefix, " recursing on ", recursive_objects[0], + " -> ", valstr + except Exception, e: + if (_verbose > 9): + print debug_prefix, " except ", e + valstr = u"<undecodable>" + else: + # this yields UUID-blah. not good. try something else + # valstr = value.__repr__() + # print debug_prefix, " val ", value + + # yetch. this needs to be abstracted someplace? I don't + # think we have the infrastructure we need to make these id + # strings be sensible in the general case + if property.name == "systemId": + # special case. try to do something sensible about systemref objects + valstr = object.nodeName + else: + valstr = value.__repr__() # I think... + result += valstr + if (_verbose > 9): + print debug_prefix, " id ", self, " -> ", result + _debug_recursion -= 1 + return result + + def loadTable(self, cls): + if _verbose > 1: + print " Class:", cls.getClassName() + list = self.qmf.getObjects(_class=cls.getClassName(), + _package=cls.getPackageName(), + _agent=self.brokerAgent) + + # tables-by-name maps class name to a table by object-name of + # objects. ie use the class name ("broker", "queue", etc) to + # index tables-by-name, returning a second table, use the + # object name to index that to get an object. + + self.tablesByName[cls.getClassName()] = {} + for obj in list: + # make sure we aren't colliding on name. it's an internal + # error (ie, the name-generation code is busted) if we do + key = self.getAbstractId(obj) + if key in self.tablesByName[cls.getClassName()]: + print "internal error: collision for %s on key %s\n" % (obj, key) + sys.exit(1) + + self.tablesByName[cls.getClassName()][self.getAbstractId(obj)] = obj +# sys.exit(1) + if _verbose > 1: + print " ", obj.getObjectId(), " ", obj.getIndex(), " ", self.getAbstractId(obj) + + +class BrokerManager: + def __init__(self): + self.brokerName = None + self.qmf = None + self.broker = None + self.brokers = [] + self.cluster = None + + def SetBroker(self, brokerUrl): + self.url = brokerUrl + self.qmf = Session() + self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + def Disconnect(self): + if self.broker: + self.qmf.delBroker(self.broker) + + def _getCluster(self): + packages = self.qmf.getPackages() + if "org.apache.qpid.cluster" not in packages: + return None + + clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) + if len(clusters) == 0: + print "Clustering is installed but not enabled on the broker." + return None + + self.cluster = clusters[0] + + def _getHostList(self, urlList): + hosts = [] + hostAddr = IpAddr(_host) + for url in urlList: + if url.find("amqp:") != 0: + raise Exception("Invalid URL 1") + url = url[5:] + addrs = str(url).split(",") + addrList = [] + for addr in addrs: + tokens = addr.split(":") + if len(tokens) != 3: + raise Exception("Invalid URL 2") + addrList.append((tokens[1], tokens[2])) + + # Find the address in the list that is most likely to be in the same subnet as the address + # with which we made the original QMF connection. This increases the probability that we will + # be able to reach the cluster member. + + best = hostAddr.bestAddr(addrList) + bestUrl = best[0] + ":" + best[1] + hosts.append(bestUrl) + return hosts + + + # the main fun which tests for broker state "identity". now that + # we're using qmf2 style object names across the board, that test + # means that we are ensuring that for all objects of a given + # class, an object of that class with the same object name exists + # on the peer broker. + + def verify(self): + if _verbose > 0: + print "Connecting to the cluster..." + self._getCluster() + if self.cluster: + memberList = self.cluster.members.split(";") + hostList = self._getHostList(memberList) + self.qmf.delBroker(self.broker) + self.broker = None + for host in hostList: + b = self.qmf.addBroker(host, _connTimeout) + self.brokers.append(Broker(self.qmf, b)) + if _verbose > 0: + print " ", b + else: + print "Failed - Not a cluster" + sys.exit(1) + + failures = [] + + # Wait until connections to all nodes are established before + # loading the management data. This will ensure that the + # objects are all stable and the same. + if _verbose > 0: + print "Loading management data from nodes..." + for broker in self.brokers: + broker.getData() + + # If we're testing delete-some-objects functionality, create a + # few widgets here and then delete them. + if _del_test: + if _verbose > 0: + print "Running delete test" + # just stick 'em in the first broker + b = self.brokers[0] + session = b.qmf.brokers[0].getAmqpSession() + session.queue_declare(queue="foo", exclusive=True, auto_delete=True) + session.exchange_bind(exchange="amq.direct", + queue="foo", binding_key="foo") + session.queue_declare(queue="bar", exclusive=True, auto_delete=True) + session.exchange_bind(exchange="amq.direct", + queue="bar", binding_key="bar") + # now delete 'em + session.exchange_unbind(queue="foo", exchange="amq.direct", binding_key="foo") + session.exchange_unbind(queue="bar", exchange="amq.direct", binding_key="bar") + session.queue_delete("bar") + session.queue_delete("foo") + + # Verify that each node has the same set of objects (based on + # object name). + if _verbose > 0: + print "Verifying objects based on object name..." + base = self.brokers[0] + for broker in self.brokers[1:]: + + # walk over the class names, for each class (with some + # exceptions) walk over the objects of that class, making + # sure they match between broker A and broker B + + for className in base.tablesByName: + if className in ["broker", "system", "connection"]: + continue + + tab1 = base.tablesByName[className] + tab2 = broker.tablesByName[className] + + for key in tab1: + if key not in tab2: + failures.append("%s key %s not found on node %s" % + (className, key, broker.getUrl())) + for key in tab2: + if key not in tab1: + failures.append("%s key %s not found on node %s" % + (className, key, base.getUrl())) + + if len(failures) > 0: + print "Failures:" + for failure in failures: + print " %s" % failure + sys.exit(1) + + if _verbose > 0: + print "Success" + sys.exit(0) + +## +## Main Program +## + +try: + longOpts = ("verbose=", "timeout=", "delete") + (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "", longOpts) +except: + Usage() + +try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] +except: + cargs = encArgs + +for opt in optlist: + if opt[0] == "--timeout": + _connTimeout = int(opt[1]) + if _connTimeout == 0: + _connTimeout = None + elif opt[0] == "--verbose": + _verbose = int(opt[1]) + elif opt[0] == "--delete": + _del_test = True; + else: + Usage() + +nargs = len(cargs) +bm = BrokerManager() + +if nargs == 1: + _host = cargs[0] + +try: + bm.SetBroker(_host) + bm.verify() +except KeyboardInterrupt: + print +except Exception,e: + print "Failed: %s - %s" % (e.__class__.__name__, e) + sys.exit(1) + +bm.Disconnect() |