diff options
Diffstat (limited to 'qpid/cpp/management/python/bin/qpid-route')
-rwxr-xr-x | qpid/cpp/management/python/bin/qpid-route | 635 |
1 files changed, 635 insertions, 0 deletions
diff --git a/qpid/cpp/management/python/bin/qpid-route b/qpid/cpp/management/python/bin/qpid-route new file mode 100755 index 0000000000..f51d2493e9 --- /dev/null +++ b/qpid/cpp/management/python/bin/qpid-route @@ -0,0 +1,635 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from optparse import OptionParser, OptionGroup, IndentedHelpFormatter +import sys +import os +import locale +from qmf.console import Session, BrokerURL +from time import sleep + +usage = """ +Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list] [mechanism] + qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange> + + qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list] [mechanism] + qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key> + qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue> [mechanism] + qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue> + qpid-route [OPTIONS] route list [<dest-broker>] + qpid-route [OPTIONS] route flush [<dest-broker>] + qpid-route [OPTIONS] route map [<broker>] + + qpid-route [OPTIONS] link add <dest-broker> <src-broker> [mechanism] + qpid-route [OPTIONS] link del <dest-broker> <src-broker> + qpid-route [OPTIONS] link list [<dest-broker>]""" + +description = """ +ADDRESS syntax: + + [username/password@] hostname + ip-address [:<port>]""" + +def Usage(): + print usage + +class Config: + def __init__(self): + self._verbose = False + self._quiet = False + self._durable = False + self._dellink = False + self._srclocal = False + self._transport = "tcp" + self._ack = 0 + self._credit = 0xFFFFFFFF # unlimited + self._connTimeout = 10 + self._conn_options = {} + +config = Config() + +class JHelpFormatter(IndentedHelpFormatter): + """Format usage and description without stripping newlines from usage strings + """ + + def format_usage(self, usage): + return usage + + + def format_description(self, description): + if description: + return description + "\n" + else: + return "" + +def OptionsAndArguments(argv): + parser = OptionParser(usage=usage, + description=description, + formatter=JHelpFormatter()) + + parser.add_option("--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)") + parser.add_option("-v", "--verbose", action="store_true", help="Verbose output") + parser.add_option("-q", "--quiet", action="store_true", help="Quiet output, don't print duplicate warnings") + parser.add_option("-d", "--durable", action="store_true", help="Added configuration shall be durable") + + parser.add_option("-e", "--del-empty-link", action="store_true", help="Delete link after deleting last route on the link") + parser.add_option("-s", "--src-local", action="store_true", help="Make connection to source broker (push route)") + + parser.add_option("--ack", action="store", type="int", metavar="<n>", help="Acknowledge transfers over the bridge in batches of N") + parser.add_option("--credit", action="store", type="int", default=0xFFFFFFFF, metavar="<msgs>", + help="Maximum number of messages a sender can have outstanding (0=unlimited)") + parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp") + + parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.") + parser.add_option("--sasl-service-name", action="store", type="string", help="SASL service name to use") + parser.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)") + parser.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)") + parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") + opts, encArgs = parser.parse_args(args=argv) + + try: + encoding = locale.getpreferredencoding() + args = [a.decode(encoding) for a in encArgs] + except: + args = encArgs + + if opts.timeout: + config._connTimeout = opts.timeout + if config._connTimeout == 0: + config._connTimeout = None + + if opts.verbose: + config._verbose = True + + if opts.quiet: + config._quiet = True + + if opts.durable: + config._durable = True + + if opts.del_empty_link: + config._dellink = True + + if opts.src_local: + config._srclocal = True + + if opts.transport: + config._transport = opts.transport + + if opts.ha_admin: + config._conn_options['client_properties'] = {'qpid.ha-admin' : 1} + + if opts.ack: + config._ack = opts.ack + + config._credit = opts.credit + + if opts.client_sasl_mechanism: + config._conn_options['mechanisms'] = opts.client_sasl_mechanism + if opts.sasl_service_name: + config._conn_options['service'] = opts.sasl_service_name + + if opts.ssl_certificate: + config._conn_options['ssl_certfile'] = opts.ssl_certificate + + if opts.ssl_key: + if not opts.ssl_certificate: + parser.error("missing '--ssl-certificate' (required by '--ssl-key')") + config._conn_options['ssl_keyfile'] = opts.ssl_key + + return args + + +class RouteManager: + def __init__(self, localBroker): + self.brokerList = {} + self.local = BrokerURL(localBroker) + self.remote = None + self.qmf = Session() + self.broker = self.qmf.addBroker(localBroker, config._connTimeout, **config._conn_options) + self.broker._waitForStable() + self.agent = self.broker.getBrokerAgent() + + def disconnect(self): + try: + if self.broker: + self.qmf.delBroker(self.broker) + self.broker = None + while len(self.brokerList): + b = self.brokerList.popitem() + if b[0] != self.local.name(): + self.qmf.delBroker(b[1]) + except: + pass # ignore errors while shutting down + + def getLink(self): + links = self.agent.getObjects(_class="link") + for link in links: + if self.remote.match(link.host, link.port): + return link + return None + + def checkLink(self, link): + retry = 3 + while link is None or (link.state in ("Waiting", "Connecting", "Closing") and retry > 0): + sleep(1) + link = self.getLink() + retry -= 1 + + if link == None: + raise Exception("Link failed to create") + + if link.state == "Failed": + raise Exception("Link failed to create %s" % (link.lastError or "")) + elif config._verbose: + print "Link state is", link.state + + def addLink(self, remoteBroker, interbroker_mechanism=""): + self.remote = BrokerURL(remoteBroker) + if self.local.match(self.remote.host, self.remote.port): + raise Exception("Linking broker to itself is not permitted") + + brokers = self.agent.getObjects(_class="broker") + broker = brokers[0] + link = self.getLink() + if link == None: + res = broker.connect(self.remote.host, self.remote.port, config._durable, + interbroker_mechanism, self.remote.authName or "", self.remote.authPass or "", + config._transport) + + def delLink(self, remoteBroker): + self.remote = BrokerURL(remoteBroker) + brokers = self.agent.getObjects(_class="broker") + broker = brokers[0] + link = self.getLink() + if link == None: + raise Exception("Link not found") + + res = link.close() + if config._verbose: + print "Close method returned:", res.status, res.text + + def listLinks(self): + links = self.agent.getObjects(_class="link") + if len(links) == 0: + print "No Links Found" + else: + print + print "Host Port Transport Durable State Last Error" + print "=============================================================================" + for link in links: + print "%-16s%-8d%-13s%c %-18s%s" % \ + (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError) + + def mapRoutes(self): + print + print "Finding Linked Brokers:" + + self.brokerList[self.local.name()] = self.broker + print " %s:%s... Ok" % (self.local.host, self.local.port) + + added = True + while added: + added = False + links = self.qmf.getObjects(_class="link") + for link in links: + url = BrokerURL(host=link.host, port=link.port, user=self.broker.authUser, password=self.broker.authPass) + if url.name() not in self.brokerList: + print " %s:%s..." % (link.host, link.port) + try: + url.authName = self.local.authName + url.authPass = self.local.authPass + b = self.qmf.addBroker(url, config._connTimeout, **config._conn_options) + self.brokerList[url.name()] = b + added = True + print "Ok" + except Exception, e: + print e + + print + print "Dynamic Routes:" + bridges = self.qmf.getObjects(_class="bridge", dynamic=True) + fedExchanges = [] + for bridge in bridges: + if bridge.src not in fedExchanges: + fedExchanges.append(bridge.src) + if len(fedExchanges) == 0: + print " none found" + print + + for ex in fedExchanges: + print " Exchange %s:" % ex + pairs = [] + for bridge in bridges: + if bridge.src == ex: + link = bridge._linkRef_ + fromUrl = BrokerURL(host=link.host, port=link.port) + toUrl = bridge.getBroker().getUrl() + found = False + for pair in pairs: + if pair.matches(fromUrl, toUrl): + found = True + if not found: + pairs.append(RoutePair(fromUrl, toUrl)) + for pair in pairs: + print " %s" % pair + print + + print "Static Routes:" + bridges = self.qmf.getObjects(_class="bridge", dynamic=False) + if len(bridges) == 0: + print " none found" + print + + for bridge in bridges: + link = bridge._linkRef_ + fromUrl = "%s:%s" % (link.host, link.port) + toUrl = bridge.getBroker().getUrl() + leftType = "ex" + rightType = "ex" + if bridge.srcIsLocal: + arrow = "=>" + left = bridge.src + right = bridge.dest + if bridge.srcIsQueue: + leftType = "queue" + else: + arrow = "<=" + left = bridge.dest + right = bridge.src + if bridge.srcIsQueue: + rightType = "queue" + + if bridge.srcIsQueue: + print " %s(%s=%s) %s %s(%s=%s)" % \ + (toUrl, leftType, left, arrow, fromUrl, rightType, right) + else: + print " %s(%s=%s) %s %s(%s=%s) key=%s" % \ + (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key) + print + + while len(self.brokerList): + b = self.brokerList.popitem() + if b[0] != self.local.name(): + self.qmf.delBroker(b[1]) + + def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, interbroker_mechanism="", dynamic=False): + if dynamic and config._srclocal: + raise Exception("--src-local is not permitted on dynamic routes") + + self.addLink(remoteBroker, interbroker_mechanism) + link = self.getLink() + self.checkLink(link) + + bridges = self.agent.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue: + if not config._quiet: + raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) + sys.exit(0) + + if config._verbose: + print "Creating inter-broker binding..." + res = link.bridge(config._durable, exchange, exchange, routingKey, tag, + excludes, False, config._srclocal, dynamic, + config._ack, credit=config._credit) + if res.status != 0: + raise Exception(res.text) + if config._verbose: + print "Bridge method returned:", res.status, res.text + + def addQueueRoute(self, remoteBroker, interbroker_mechanism, exchange, queue ): + self.addLink(remoteBroker, interbroker_mechanism) + link = self.getLink() + self.checkLink(link) + + bridges = self.agent.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: + if not config._quiet: + raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue)) + sys.exit(0) + + if config._verbose: + print "Creating inter-broker binding..." + res = link.bridge(config._durable, queue, exchange, "", "", "", True, + config._srclocal, False, config._ack, credit=config._credit) + if res.status != 0: + raise Exception(res.text) + if config._verbose: + print "Bridge method returned:", res.status, res.text + + def delQueueRoute(self, remoteBroker, exchange, queue): + self.remote = BrokerURL(remoteBroker) + link = self.getLink() + if link == None: + if not config._quiet: + raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) + sys.exit(0) + + bridges = self.agent.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: + if config._verbose: + print "Closing bridge..." + res = bridge.close() + if res.status != 0: + raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) + if len(bridges) == 1 and config._dellink: + link = self.getLink() + if link == None: + sys.exit(0) + if config._verbose: + print "Last bridge on link, closing link..." + res = link.close() + if res.status != 0: + raise Exception("Error closing link: %d - %s" % (res.status, res.text)) + sys.exit(0) + if not config._quiet: + raise Exception("Route not found") + + def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False): + self.remote = BrokerURL(remoteBroker) + link = self.getLink() + if link == None: + if not config._quiet: + raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) + sys.exit(0) + + bridges = self.agent.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \ + and bridge.dynamic == dynamic: + if config._verbose: + print "Closing bridge..." + res = bridge.close() + if res.status != 0: + raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) + if len(bridges) == 1 and config._dellink: + link = self.getLink() + if link == None: + sys.exit(0) + if config._verbose: + print "Last bridge on link, closing link..." + res = link.close() + if res.status != 0: + raise Exception("Error closing link: %d - %s" % (res.status, res.text)) + return + if not config._quiet: + raise Exception("Route not found") + + def listRoutes(self): + links = self.qmf.getObjects(_class="link") + bridges = self.qmf.getObjects(_class="bridge") + + for bridge in bridges: + myLink = None + for link in links: + if bridge.linkRef == link.getObjectId(): + myLink = link + break + if myLink != None: + if bridge.dynamic: + keyText = "<dynamic>" + else: + keyText = bridge.key + print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText) + + def clearAllRoutes(self): + links = self.qmf.getObjects(_class="link") + bridges = self.qmf.getObjects(_class="bridge") + + for bridge in bridges: + if config._verbose: + myLink = None + for link in links: + if bridge.linkRef == link.getObjectId(): + myLink = link + break + if myLink != None: + print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key), + res = bridge.close() + if res.status != 0: + print "Error: %d - %s" % (res.status, res.text) + elif config._verbose: + print "Ok" + + if config._dellink: + links = self.qmf.getObjects(_class="link") + for link in links: + if config._verbose: + print "Deleting Link: %s:%d... " % (link.host, link.port), + res = link.close() + if res.status != 0: + print "Error: %d - %s" % (res.status, res.text) + elif config._verbose: + print "Ok" + +class RoutePair: + def __init__(self, fromUrl, toUrl): + self.fromUrl = fromUrl + self.toUrl = toUrl + self.bidir = False + + def __repr__(self): + if self.bidir: + delimit = "<=>" + else: + delimit = " =>" + return "%s %s %s" % (self.fromUrl, delimit, self.toUrl) + + def matches(self, fromUrl, toUrl): + if fromUrl == self.fromUrl and toUrl == self.toUrl: + return True + if toUrl == self.fromUrl and fromUrl == self.toUrl: + self.bidir = True + return True + return False + + +def YN(val): + if val == 1: + return 'Y' + return 'N' + + +def main(argv=None): + + args = OptionsAndArguments(argv) + nargs = len(args) + if nargs < 2: + Usage() + return(-1) + + if nargs == 2: + localBroker = "localhost" + else: + if config._srclocal: + localBroker = args[3] + remoteBroker = args[2] + else: + localBroker = args[2] + if nargs > 3: + remoteBroker = args[3] + + group = args[0] + cmd = args[1] + + rm = None + try: + rm = RouteManager(localBroker) + if group == "link": + if cmd == "add": + if nargs < 3 or nargs > 5: + Usage() + return(-1) + interbroker_mechanism = "" + if nargs > 4: interbroker_mechanism = args[4] + rm.addLink(remoteBroker, interbroker_mechanism) + rm.checkLink(rm.getLink()) + elif cmd == "del": + if nargs != 4: + Usage() + return(-1) + rm.delLink(remoteBroker) + elif cmd == "list": + rm.listLinks() + + elif group == "dynamic": + if cmd == "add": + if nargs < 5 or nargs > 8: + Usage() + return(-1) + + tag = "" + excludes = "" + interbroker_mechanism = "" + if nargs > 5: tag = args[5] + if nargs > 6: excludes = args[6] + if nargs > 7: interbroker_mechanism = args[7] + rm.addRoute(remoteBroker, args[4], "", tag, excludes, interbroker_mechanism, dynamic=True) + elif cmd == "del": + if nargs != 5: + Usage() + return(-1) + else: + rm.delRoute(remoteBroker, args[4], "", dynamic=True) + + elif group == "route": + if cmd == "add": + if nargs < 6 or nargs > 9: + Usage() + return(-1) + + tag = "" + excludes = "" + interbroker_mechanism = "" + if nargs > 6: tag = args[6] + if nargs > 7: excludes = args[7] + if nargs > 8: interbroker_mechanism = args[8] + rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, interbroker_mechanism, dynamic=False) + elif cmd == "del": + if nargs != 6: + Usage() + return(-1) + rm.delRoute(remoteBroker, args[4], args[5], dynamic=False) + elif cmd == "map": + rm.mapRoutes() + else: + if cmd == "list": + rm.listRoutes() + elif cmd == "flush": + rm.clearAllRoutes() + else: + Usage() + return(-1) + + elif group == "queue": + if nargs < 6 or nargs > 7: + Usage() + return(-1) + if cmd == "add": + interbroker_mechanism = "" + if nargs > 6: interbroker_mechanism = args[6] + rm.addQueueRoute(remoteBroker, interbroker_mechanism, exchange=args[4], queue=args[5] ) + elif cmd == "del": + rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5]) + else: + Usage() + return(-1) + else: + Usage() + return(-1) + + except Exception,e: + if rm: + rm.disconnect() # try to release broker resources + print "Failed: %s - %s" % (e.__class__.__name__, e) + return 1 + + rm.disconnect() + return 0 + +if __name__ == "__main__": + sys.exit(main()) |