summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-02-05 22:47:56 +0000
committerTed Ross <tross@apache.org>2010-02-05 22:47:56 +0000
commit2b86496e7e980834464c35f49cbf7337815aeb4c (patch)
treea2e914cd6f6e7d30f9a4bf9cfc8b73fa58a03be6
parent21fbb39e510e668f8dbcf221a8d4ba8c3a84f6b7 (diff)
downloadqpid-python-2b86496e7e980834464c35f49cbf7337815aeb4c.tar.gz
QPID-2029 - Added verification script contributed by John Dunning
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@907119 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/tests/verify_cluster_objects400
1 files changed, 400 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/verify_cluster_objects b/qpid/cpp/src/tests/verify_cluster_objects
new file mode 100644
index 0000000000..cea875662f
--- /dev/null
+++ b/qpid/cpp/src/tests/verify_cluster_objects
@@ -0,0 +1,400 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import locale
+import socket
+import re
+from qmf.console import Session, SchemaClass
+
+_host = "localhost"
+_connTimeout = 10
+_verbose = 0
+_del_test = False;
+pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
+_debug_recursion = 0
+
+def Usage ():
+ print "Usage: verify_cluster_objects [OPTIONS] [broker-addr]"
+ print
+ print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
+ print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
+ print
+ print " This program contacts every node of a cluster, loads all manageable objects from"
+ print " those nodes and verifies that the management data is identical across the clusters."
+ print
+ print "Options:"
+ print " --timeout seconds (10) Maximum time to wait for broker connection"
+ print " --verbose level (0) Show details of objects and their IDs"
+ print " --delete Delete some objects after creation, to test synchup"
+ print
+ sys.exit (1)
+
+class IpAddr:
+ def __init__(self, text):
+ if text.find("@") != -1:
+ tokens = text.split("@")
+ text = tokens[1]
+ if text.find(":") != -1:
+ tokens = text.split(":")
+ text = tokens[0]
+ self.port = int(tokens[1])
+ else:
+ self.port = 5672
+ self.dottedQuad = socket.gethostbyname(text)
+ nums = self.dottedQuad.split(".")
+ self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
+
+ def bestAddr(self, addrPortList):
+ bestDiff = 0xFFFFFFFFL
+ bestAddr = None
+ for addrPort in addrPortList:
+ diff = IpAddr(addrPort[0]).addr ^ self.addr
+ if diff < bestDiff:
+ bestDiff = diff
+ bestAddr = addrPort
+ return bestAddr
+
+class Broker(object):
+ def __init__(self, qmf, broker):
+ self.broker = broker
+ self.qmf = qmf
+
+ agents = qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == 0:
+ self.brokerAgent = a
+
+ bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker",
+ _agent=self.brokerAgent)[0]
+ self.currentTime = bobj.getTimestamps()[0]
+ try:
+ self.uptime = bobj.uptime
+ except:
+ self.uptime = 0
+ self.tablesByName = {}
+ self.package = "org.apache.qpid.broker"
+
+ def getUrl(self):
+ return self.broker.getUrl()
+
+ def getData(self):
+ if _verbose > 1:
+ print "Broker:", self.broker
+
+ classList = self.qmf.getClasses(self.package)
+ for cls in classList:
+ if self.qmf.getSchema(cls).kind == SchemaClass.CLASS_KIND_TABLE:
+ self.loadTable(cls)
+
+
+ #
+ # this should be a method on an object, but is kept here for now, until
+ # we finish sorting out the treatment of names in qmfv2
+ #
+ def getAbstractId(self, object):
+ """ return a string the of the hierarchical name """
+ global _debug_recursion
+ result = u""
+ valstr = u""
+ _debug_recursion += 1
+ debug_prefix = _debug_recursion
+ if (_verbose > 9):
+ print debug_prefix, " enter gai: props ", self._properties
+ for property, value in object._properties:
+
+ # we want to recurse on things which are refs. we tell by
+ # asking each property if it's an index. I think...
+ if (_verbose > 9):
+ print debug_prefix, " prop ", property, " val " , value, " idx ",
+ property.index, " type ", property.type
+
+ # property is an instance, you can ask its type, name, etc.
+
+ # special case system refs, as they will never be the same on
+ # distinct cluster nodes. later we probably want a different
+ # way of representing these objects, like for instance don't
+ # include the system ref in the hierarchy.
+
+ if property.name == "systemRef":
+ _debug_recursion -= 1
+ return ""
+
+ if property.index:
+ if result != u"":
+ result += u":"
+ if property.type == 10:
+ try:
+ recursive_objects = object._session.getObjects(_objectId = value, _broker=object._broker)
+ if (_verbose > 9):
+ print debug_prefix, " r ", recursive_objects[0]
+ for rp, rv in recursive_objects[0]._properties:
+ print debug_prefix, " rrr ", rp, " idx-p ", rp.index, " v ", rv
+ print debug_prefix, " recursing on ", recursive_objects[0]
+ valstr = self.getAbstractId(recursive_objects[0])
+ if (_verbose > 9):
+ print debug_prefix, " recursing on ", recursive_objects[0],
+ " -> ", valstr
+ except Exception, e:
+ if (_verbose > 9):
+ print debug_prefix, " except ", e
+ valstr = u"<undecodable>"
+ else:
+ # this yields UUID-blah. not good. try something else
+ # valstr = value.__repr__()
+ # print debug_prefix, " val ", value
+
+ # yetch. this needs to be abstracted someplace? I don't
+ # think we have the infrastructure we need to make these id
+ # strings be sensible in the general case
+ if property.name == "systemId":
+ # special case. try to do something sensible about systemref objects
+ valstr = object.nodeName
+ else:
+ valstr = value.__repr__() # I think...
+ result += valstr
+ if (_verbose > 9):
+ print debug_prefix, " id ", self, " -> ", result
+ _debug_recursion -= 1
+ return result
+
+ def loadTable(self, cls):
+ if _verbose > 1:
+ print " Class:", cls.getClassName()
+ list = self.qmf.getObjects(_class=cls.getClassName(),
+ _package=cls.getPackageName(),
+ _agent=self.brokerAgent)
+
+ # tables-by-name maps class name to a table by object-name of
+ # objects. ie use the class name ("broker", "queue", etc) to
+ # index tables-by-name, returning a second table, use the
+ # object name to index that to get an object.
+
+ self.tablesByName[cls.getClassName()] = {}
+ for obj in list:
+ # make sure we aren't colliding on name. it's an internal
+ # error (ie, the name-generation code is busted) if we do
+ key = self.getAbstractId(obj)
+ if key in self.tablesByName[cls.getClassName()]:
+ print "internal error: collision for %s on key %s\n" % (obj, key)
+ sys.exit(1)
+
+ self.tablesByName[cls.getClassName()][self.getAbstractId(obj)] = obj
+# sys.exit(1)
+ if _verbose > 1:
+ print " ", obj.getObjectId(), " ", obj.getIndex(), " ", self.getAbstractId(obj)
+
+
+class BrokerManager:
+ def __init__(self):
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+ self.brokers = []
+ self.cluster = None
+
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == 0:
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+
+ def _getCluster(self):
+ packages = self.qmf.getPackages()
+ if "org.apache.qpid.cluster" not in packages:
+ return None
+
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ print "Clustering is installed but not enabled on the broker."
+ return None
+
+ self.cluster = clusters[0]
+
+ def _getHostList(self, urlList):
+ hosts = []
+ hostAddr = IpAddr(_host)
+ for url in urlList:
+ if url.find("amqp:") != 0:
+ raise Exception("Invalid URL 1")
+ url = url[5:]
+ addrs = str(url).split(",")
+ addrList = []
+ for addr in addrs:
+ tokens = addr.split(":")
+ if len(tokens) != 3:
+ raise Exception("Invalid URL 2")
+ addrList.append((tokens[1], tokens[2]))
+
+ # Find the address in the list that is most likely to be in the same subnet as the address
+ # with which we made the original QMF connection. This increases the probability that we will
+ # be able to reach the cluster member.
+
+ best = hostAddr.bestAddr(addrList)
+ bestUrl = best[0] + ":" + best[1]
+ hosts.append(bestUrl)
+ return hosts
+
+
+ # the main fun which tests for broker state "identity". now that
+ # we're using qmf2 style object names across the board, that test
+ # means that we are ensuring that for all objects of a given
+ # class, an object of that class with the same object name exists
+ # on the peer broker.
+
+ def verify(self):
+ if _verbose > 0:
+ print "Connecting to the cluster..."
+ self._getCluster()
+ if self.cluster:
+ memberList = self.cluster.members.split(";")
+ hostList = self._getHostList(memberList)
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ for host in hostList:
+ b = self.qmf.addBroker(host, _connTimeout)
+ self.brokers.append(Broker(self.qmf, b))
+ if _verbose > 0:
+ print " ", b
+ else:
+ print "Failed - Not a cluster"
+ sys.exit(1)
+
+ failures = []
+
+ # Wait until connections to all nodes are established before
+ # loading the management data. This will ensure that the
+ # objects are all stable and the same.
+ if _verbose > 0:
+ print "Loading management data from nodes..."
+ for broker in self.brokers:
+ broker.getData()
+
+ # If we're testing delete-some-objects functionality, create a
+ # few widgets here and then delete them.
+ if _del_test:
+ if _verbose > 0:
+ print "Running delete test"
+ # just stick 'em in the first broker
+ b = self.brokers[0]
+ session = b.qmf.brokers[0].getAmqpSession()
+ session.queue_declare(queue="foo", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="amq.direct",
+ queue="foo", binding_key="foo")
+ session.queue_declare(queue="bar", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="amq.direct",
+ queue="bar", binding_key="bar")
+ # now delete 'em
+ session.exchange_unbind(queue="foo", exchange="amq.direct", binding_key="foo")
+ session.exchange_unbind(queue="bar", exchange="amq.direct", binding_key="bar")
+ session.queue_delete("bar")
+ session.queue_delete("foo")
+
+ # Verify that each node has the same set of objects (based on
+ # object name).
+ if _verbose > 0:
+ print "Verifying objects based on object name..."
+ base = self.brokers[0]
+ for broker in self.brokers[1:]:
+
+ # walk over the class names, for each class (with some
+ # exceptions) walk over the objects of that class, making
+ # sure they match between broker A and broker B
+
+ for className in base.tablesByName:
+ if className in ["broker", "system", "connection"]:
+ continue
+
+ tab1 = base.tablesByName[className]
+ tab2 = broker.tablesByName[className]
+
+ for key in tab1:
+ if key not in tab2:
+ failures.append("%s key %s not found on node %s" %
+ (className, key, broker.getUrl()))
+ for key in tab2:
+ if key not in tab1:
+ failures.append("%s key %s not found on node %s" %
+ (className, key, base.getUrl()))
+
+ if len(failures) > 0:
+ print "Failures:"
+ for failure in failures:
+ print " %s" % failure
+ sys.exit(1)
+
+ if _verbose > 0:
+ print "Success"
+ sys.exit(0)
+
+##
+## Main Program
+##
+
+try:
+ longOpts = ("verbose=", "timeout=", "delete")
+ (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "", longOpts)
+except:
+ Usage()
+
+try:
+ encoding = locale.getpreferredencoding()
+ cargs = [a.decode(encoding) for a in encArgs]
+except:
+ cargs = encArgs
+
+for opt in optlist:
+ if opt[0] == "--timeout":
+ _connTimeout = int(opt[1])
+ if _connTimeout == 0:
+ _connTimeout = None
+ elif opt[0] == "--verbose":
+ _verbose = int(opt[1])
+ elif opt[0] == "--delete":
+ _del_test = True;
+ else:
+ Usage()
+
+nargs = len(cargs)
+bm = BrokerManager()
+
+if nargs == 1:
+ _host = cargs[0]
+
+try:
+ bm.SetBroker(_host)
+ bm.verify()
+except KeyboardInterrupt:
+ print
+except Exception,e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ sys.exit(1)
+
+bm.Disconnect()