#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import os
import getopt
import sys
import locale
import socket
import re
from qmf.console import Session, SchemaClass

_host = "localhost"
_connTimeout = 10
_verbose = 0
_del_test = False;
pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
_debug_recursion = 0

def Usage ():
    print "Usage:  verify_cluster_objects [OPTIONS] [broker-addr]"
    print
    print "             broker-addr is in the form:   [username/password@] hostname | ip-address [:<port>]"
    print "             ex:  localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
    print
    print "    This program contacts every node of a cluster, loads all manageable objects from"
    print "    those nodes and verifies that the management data is identical across the clusters."
    print
    print "Options:"
    print "    --timeout seconds (10)  Maximum time to wait for broker connection"
    print "    --verbose level (0)     Show details of objects and their IDs"
    print "    --delete                Delete some objects after creation, to test synchup"
    print
    sys.exit (1)

class IpAddr:
    def __init__(self, text):
        if text.find("@") != -1:
            tokens = text.split("@")
            text = tokens[1]
        if text.find(":") != -1:
            tokens = text.split(":")
            text = tokens[0]
            self.port = int(tokens[1])
        else:
            self.port = 5672
        self.dottedQuad = socket.gethostbyname(text)
        nums = self.dottedQuad.split(".")
        self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])

    def bestAddr(self, addrPortList):
        bestDiff = 0xFFFFFFFFL
        bestAddr = None
        for addrPort in addrPortList:
            diff = IpAddr(addrPort[0]).addr ^ self.addr
            if diff < bestDiff:
                bestDiff = diff
                bestAddr = addrPort
        return bestAddr

class Broker(object):
    def __init__(self, qmf, broker):
        self.broker = broker
        self.qmf = qmf

        agents = qmf.getAgents()
        for a in agents:
            if a.getAgentBank() == 0:
                self.brokerAgent = a

        bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker",
                              _agent=self.brokerAgent)[0]
        self.currentTime = bobj.getTimestamps()[0]
        try:
            self.uptime = bobj.uptime
        except:
            self.uptime = 0
        self.tablesByName = {}
        self.package = "org.apache.qpid.broker"

    def getUrl(self):
        return self.broker.getUrl()

    def getData(self):
        if _verbose > 1:
            print "Broker:", self.broker

        classList = self.qmf.getClasses(self.package)
        for cls in classList:
            if self.qmf.getSchema(cls).kind == SchemaClass.CLASS_KIND_TABLE:
                self.loadTable(cls)


    #
    # this should be a method on an object, but is kept here for now, until
    # we finish sorting out the treatment of names in qmfv2
    #
    def getAbstractId(self, object):
      """ return a string the of the hierarchical name """
      global _debug_recursion
      result = u""
      valstr = u""
      _debug_recursion += 1
      debug_prefix = _debug_recursion
      if (_verbose > 9):
          print debug_prefix, "  enter gai: props ", self._properties
      for property, value in object._properties:

          # we want to recurse on things which are refs.  we tell by
          # asking each property if it's an index.  I think...
          if (_verbose > 9):
              print debug_prefix, "  prop ", property, " val " , value, " idx ", 
              property.index, " type ", property.type

          # property is an instance, you can ask its type, name, etc.

          # special case system refs, as they will never be the same on
          # distinct cluster nodes.  later we probably want a different
          # way of representing these objects, like for instance don't
          # include the system ref in the hierarchy.

          if property.name == "systemRef":
              _debug_recursion -= 1
              return ""

          if property.index:
              if result != u"":
                  result += u":"
              if property.type == 10:
                  try:
                      recursive_objects = object._session.getObjects(_objectId = value, _broker=object._broker)
                      if (_verbose > 9):
                          print debug_prefix, "   r ", recursive_objects[0]
                          for rp, rv in recursive_objects[0]._properties:
                              print debug_prefix, "   rrr ", rp, " idx-p ", rp.index, " v ", rv
                          print debug_prefix, "    recursing on ", recursive_objects[0]
                      valstr = self.getAbstractId(recursive_objects[0])
                      if (_verbose > 9):
                          print debug_prefix,  "    recursing on ", recursive_objects[0],
                          " -> ", valstr
                  except Exception, e:
                      if (_verbose > 9):
                          print debug_prefix, "          except ", e
                      valstr = u"<undecodable>"
              else:
                  # this yields UUID-blah.  not good.  try something else
                  # valstr = value.__repr__()
                  # print debug_prefix, " val ", value
          
                  # yetch.  this needs to be abstracted someplace?  I don't
                  # think we have the infrastructure we need to make these id
                  # strings be sensible in the general case
                  if property.name == "systemId":
                      # special case.  try to do something sensible about systemref objects
                      valstr = object.nodeName
                  else:
                      valstr = value.__repr__() # I think...
          result += valstr
          if (_verbose > 9):
              print debug_prefix, "    id ", self, " -> ", result
      _debug_recursion -= 1
      return result

    def loadTable(self, cls):
        if _verbose > 1:
            print "  Class:", cls.getClassName()
        list = self.qmf.getObjects(_class=cls.getClassName(),
                                   _package=cls.getPackageName(),
                                   _agent=self.brokerAgent)

        # tables-by-name maps class name to a table by object-name of
        # objects.  ie use the class name ("broker", "queue", etc) to
        # index tables-by-name, returning a second table, use the
        # object name to index that to get an object.

        self.tablesByName[cls.getClassName()] = {}
        for obj in list:
            # make sure we aren't colliding on name.  it's an internal
            # error (ie, the name-generation code is busted) if we do
            key = self.getAbstractId(obj)
            if key in self.tablesByName[cls.getClassName()]:
                print "internal error: collision for %s on key %s\n" % (obj, key)
                sys.exit(1)
                
            self.tablesByName[cls.getClassName()][self.getAbstractId(obj)] = obj
#            sys.exit(1)
            if _verbose > 1:
                print "   ", obj.getObjectId(), " ", obj.getIndex(), " ", self.getAbstractId(obj)


class BrokerManager:
    def __init__(self):
        self.brokerName = None
        self.qmf        = None
        self.broker     = None
        self.brokers    = []
        self.cluster    = None

    def SetBroker(self, brokerUrl):
        self.url = brokerUrl
        self.qmf = Session()
        self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
        agents = self.qmf.getAgents()
        for a in agents:
            if a.getAgentBank() == 0:
                self.brokerAgent = a

    def Disconnect(self):
        if self.broker:
            self.qmf.delBroker(self.broker)

    def _getCluster(self):
        packages = self.qmf.getPackages()
        if "org.apache.qpid.cluster" not in packages:
            return None

        clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
        if len(clusters) == 0:
            print "Clustering is installed but not enabled on the broker."
            return None

        self.cluster = clusters[0]

    def _getHostList(self, urlList):
        hosts = []
        hostAddr = IpAddr(_host)
        for url in urlList:
            if url.find("amqp:") != 0:
                raise Exception("Invalid URL 1")
            url = url[5:]
            addrs = str(url).split(",")
            addrList = []
            for addr in addrs:
                tokens = addr.split(":")
                if len(tokens) != 3:
                    raise Exception("Invalid URL 2")
                addrList.append((tokens[1], tokens[2]))

            # Find the address in the list that is most likely to be in the same subnet as the address
            # with which we made the original QMF connection.  This increases the probability that we will
            # be able to reach the cluster member.

            best = hostAddr.bestAddr(addrList)
            bestUrl = best[0] + ":" + best[1]
            hosts.append(bestUrl)
        return hosts


    # the main fun which tests for broker state "identity".  now that
    # we're using qmf2 style object names across the board, that test
    # means that we are ensuring that for all objects of a given
    # class, an object of that class with the same object name exists
    # on the peer broker.

    def verify(self):
        if _verbose > 0:
            print "Connecting to the cluster..."
        self._getCluster()
        if self.cluster:
            memberList = self.cluster.members.split(";")
            hostList = self._getHostList(memberList)
            self.qmf.delBroker(self.broker)
            self.broker = None
            for host in hostList:
                b = self.qmf.addBroker(host, _connTimeout)
                self.brokers.append(Broker(self.qmf, b))
                if _verbose > 0:
                    print "   ", b
        else:
            print "Failed - Not a cluster"
            sys.exit(1)

        failures = []

        # Wait until connections to all nodes are established before
        # loading the management data.  This will ensure that the
        # objects are all stable and the same.
        if _verbose > 0:
            print "Loading management data from nodes..."
        for broker in self.brokers:
            broker.getData()

        # If we're testing delete-some-objects functionality, create a
        # few widgets here and then delete them.
        if _del_test:
            if _verbose > 0:
                print "Running delete test"
            # just stick 'em in the first broker
            b = self.brokers[0]
            session = b.qmf.brokers[0].getAmqpSession()
            session.queue_declare(queue="foo", exclusive=True, auto_delete=True)
            session.exchange_bind(exchange="amq.direct",
                                                 queue="foo", binding_key="foo")
            session.queue_declare(queue="bar", exclusive=True, auto_delete=True)
            session.exchange_bind(exchange="amq.direct",
                                                 queue="bar", binding_key="bar")
            # now delete 'em
            session.exchange_unbind(queue="foo", exchange="amq.direct", binding_key="foo")
            session.exchange_unbind(queue="bar", exchange="amq.direct", binding_key="bar")
            session.queue_delete("bar")
            session.queue_delete("foo")

        # Verify that each node has the same set of objects (based on
        # object name).
        if _verbose > 0:
            print "Verifying objects based on object name..."
        base = self.brokers[0]
        for broker in self.brokers[1:]:

            # walk over the class names, for each class (with some
            # exceptions) walk over the objects of that class, making
            # sure they match between broker A and broker B

            for className in base.tablesByName:
                if className in ["broker", "system", "connection"]:
                    continue

                tab1 = base.tablesByName[className]
                tab2 = broker.tablesByName[className]

                for key in tab1:
                    if key not in tab2:
                        failures.append("%s key %s not found on node %s" %
                                        (className, key, broker.getUrl()))
                for key in tab2:
                    if key not in tab1:
                        failures.append("%s key %s not found on node %s" %
                                        (className, key, base.getUrl()))

        if len(failures) > 0:
            print "Failures:"
            for failure in failures:
                print "  %s" % failure
            sys.exit(1)

        if _verbose > 0:
            print "Success"
        sys.exit(0)

##
## Main Program
##

try:
    longOpts = ("verbose=", "timeout=", "delete")
    (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "", longOpts)
except:
    Usage()

try:
    encoding = locale.getpreferredencoding()
    cargs = [a.decode(encoding) for a in encArgs]
except:
    cargs = encArgs

for opt in optlist:
    if opt[0] == "--timeout":
        _connTimeout = int(opt[1])
        if _connTimeout == 0:
            _connTimeout = None
    elif opt[0] == "--verbose":
        _verbose = int(opt[1])
    elif opt[0] == "--delete":
        _del_test = True;
    else:
        Usage()

nargs = len(cargs)
bm    = BrokerManager()

if nargs == 1:
    _host = cargs[0]

try:
    bm.SetBroker(_host)
    bm.verify()
except KeyboardInterrupt:
    print
except Exception,e:
    print "Failed: %s - %s" % (e.__class__.__name__, e)
    sys.exit(1)

bm.Disconnect()
