summaryrefslogtreecommitdiff
path: root/qpid/cpp/management/python/bin/qpid-route
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/management/python/bin/qpid-route')
-rwxr-xr-xqpid/cpp/management/python/bin/qpid-route635
1 files changed, 635 insertions, 0 deletions
diff --git a/qpid/cpp/management/python/bin/qpid-route b/qpid/cpp/management/python/bin/qpid-route
new file mode 100755
index 0000000000..f51d2493e9
--- /dev/null
+++ b/qpid/cpp/management/python/bin/qpid-route
@@ -0,0 +1,635 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
+import sys
+import os
+import locale
+from qmf.console import Session, BrokerURL
+from time import sleep
+
+usage = """
+Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list] [mechanism]
+ qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>
+
+ qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list] [mechanism]
+ qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>
+ qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue> [mechanism]
+ qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>
+ qpid-route [OPTIONS] route list [<dest-broker>]
+ qpid-route [OPTIONS] route flush [<dest-broker>]
+ qpid-route [OPTIONS] route map [<broker>]
+
+ qpid-route [OPTIONS] link add <dest-broker> <src-broker> [mechanism]
+ qpid-route [OPTIONS] link del <dest-broker> <src-broker>
+ qpid-route [OPTIONS] link list [<dest-broker>]"""
+
+description = """
+ADDRESS syntax:
+
+ [username/password@] hostname
+ ip-address [:<port>]"""
+
+def Usage():
+ print usage
+
+class Config:
+ def __init__(self):
+ self._verbose = False
+ self._quiet = False
+ self._durable = False
+ self._dellink = False
+ self._srclocal = False
+ self._transport = "tcp"
+ self._ack = 0
+ self._credit = 0xFFFFFFFF # unlimited
+ self._connTimeout = 10
+ self._conn_options = {}
+
+config = Config()
+
+class JHelpFormatter(IndentedHelpFormatter):
+ """Format usage and description without stripping newlines from usage strings
+ """
+
+ def format_usage(self, usage):
+ return usage
+
+
+ def format_description(self, description):
+ if description:
+ return description + "\n"
+ else:
+ return ""
+
+def OptionsAndArguments(argv):
+ parser = OptionParser(usage=usage,
+ description=description,
+ formatter=JHelpFormatter())
+
+ parser.add_option("--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)")
+ parser.add_option("-v", "--verbose", action="store_true", help="Verbose output")
+ parser.add_option("-q", "--quiet", action="store_true", help="Quiet output, don't print duplicate warnings")
+ parser.add_option("-d", "--durable", action="store_true", help="Added configuration shall be durable")
+
+ parser.add_option("-e", "--del-empty-link", action="store_true", help="Delete link after deleting last route on the link")
+ parser.add_option("-s", "--src-local", action="store_true", help="Make connection to source broker (push route)")
+
+ parser.add_option("--ack", action="store", type="int", metavar="<n>", help="Acknowledge transfers over the bridge in batches of N")
+ parser.add_option("--credit", action="store", type="int", default=0xFFFFFFFF, metavar="<msgs>",
+ help="Maximum number of messages a sender can have outstanding (0=unlimited)")
+ parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp")
+
+ parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.")
+ parser.add_option("--sasl-service-name", action="store", type="string", help="SASL service name to use")
+ parser.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ parser.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+ parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
+ opts, encArgs = parser.parse_args(args=argv)
+
+ try:
+ encoding = locale.getpreferredencoding()
+ args = [a.decode(encoding) for a in encArgs]
+ except:
+ args = encArgs
+
+ if opts.timeout:
+ config._connTimeout = opts.timeout
+ if config._connTimeout == 0:
+ config._connTimeout = None
+
+ if opts.verbose:
+ config._verbose = True
+
+ if opts.quiet:
+ config._quiet = True
+
+ if opts.durable:
+ config._durable = True
+
+ if opts.del_empty_link:
+ config._dellink = True
+
+ if opts.src_local:
+ config._srclocal = True
+
+ if opts.transport:
+ config._transport = opts.transport
+
+ if opts.ha_admin:
+ config._conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+
+ if opts.ack:
+ config._ack = opts.ack
+
+ config._credit = opts.credit
+
+ if opts.client_sasl_mechanism:
+ config._conn_options['mechanisms'] = opts.client_sasl_mechanism
+ if opts.sasl_service_name:
+ config._conn_options['service'] = opts.sasl_service_name
+
+ if opts.ssl_certificate:
+ config._conn_options['ssl_certfile'] = opts.ssl_certificate
+
+ if opts.ssl_key:
+ if not opts.ssl_certificate:
+ parser.error("missing '--ssl-certificate' (required by '--ssl-key')")
+ config._conn_options['ssl_keyfile'] = opts.ssl_key
+
+ return args
+
+
+class RouteManager:
+ def __init__(self, localBroker):
+ self.brokerList = {}
+ self.local = BrokerURL(localBroker)
+ self.remote = None
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(localBroker, config._connTimeout, **config._conn_options)
+ self.broker._waitForStable()
+ self.agent = self.broker.getBrokerAgent()
+
+ def disconnect(self):
+ try:
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ while len(self.brokerList):
+ b = self.brokerList.popitem()
+ if b[0] != self.local.name():
+ self.qmf.delBroker(b[1])
+ except:
+ pass # ignore errors while shutting down
+
+ def getLink(self):
+ links = self.agent.getObjects(_class="link")
+ for link in links:
+ if self.remote.match(link.host, link.port):
+ return link
+ return None
+
+ def checkLink(self, link):
+ retry = 3
+ while link is None or (link.state in ("Waiting", "Connecting", "Closing") and retry > 0):
+ sleep(1)
+ link = self.getLink()
+ retry -= 1
+
+ if link == None:
+ raise Exception("Link failed to create")
+
+ if link.state == "Failed":
+ raise Exception("Link failed to create %s" % (link.lastError or ""))
+ elif config._verbose:
+ print "Link state is", link.state
+
+ def addLink(self, remoteBroker, interbroker_mechanism=""):
+ self.remote = BrokerURL(remoteBroker)
+ if self.local.match(self.remote.host, self.remote.port):
+ raise Exception("Linking broker to itself is not permitted")
+
+ brokers = self.agent.getObjects(_class="broker")
+ broker = brokers[0]
+ link = self.getLink()
+ if link == None:
+ res = broker.connect(self.remote.host, self.remote.port, config._durable,
+ interbroker_mechanism, self.remote.authName or "", self.remote.authPass or "",
+ config._transport)
+
+ def delLink(self, remoteBroker):
+ self.remote = BrokerURL(remoteBroker)
+ brokers = self.agent.getObjects(_class="broker")
+ broker = brokers[0]
+ link = self.getLink()
+ if link == None:
+ raise Exception("Link not found")
+
+ res = link.close()
+ if config._verbose:
+ print "Close method returned:", res.status, res.text
+
+ def listLinks(self):
+ links = self.agent.getObjects(_class="link")
+ if len(links) == 0:
+ print "No Links Found"
+ else:
+ print
+ print "Host Port Transport Durable State Last Error"
+ print "============================================================================="
+ for link in links:
+ print "%-16s%-8d%-13s%c %-18s%s" % \
+ (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError)
+
+ def mapRoutes(self):
+ print
+ print "Finding Linked Brokers:"
+
+ self.brokerList[self.local.name()] = self.broker
+ print " %s:%s... Ok" % (self.local.host, self.local.port)
+
+ added = True
+ while added:
+ added = False
+ links = self.qmf.getObjects(_class="link")
+ for link in links:
+ url = BrokerURL(host=link.host, port=link.port, user=self.broker.authUser, password=self.broker.authPass)
+ if url.name() not in self.brokerList:
+ print " %s:%s..." % (link.host, link.port)
+ try:
+ url.authName = self.local.authName
+ url.authPass = self.local.authPass
+ b = self.qmf.addBroker(url, config._connTimeout, **config._conn_options)
+ self.brokerList[url.name()] = b
+ added = True
+ print "Ok"
+ except Exception, e:
+ print e
+
+ print
+ print "Dynamic Routes:"
+ bridges = self.qmf.getObjects(_class="bridge", dynamic=True)
+ fedExchanges = []
+ for bridge in bridges:
+ if bridge.src not in fedExchanges:
+ fedExchanges.append(bridge.src)
+ if len(fedExchanges) == 0:
+ print " none found"
+ print
+
+ for ex in fedExchanges:
+ print " Exchange %s:" % ex
+ pairs = []
+ for bridge in bridges:
+ if bridge.src == ex:
+ link = bridge._linkRef_
+ fromUrl = BrokerURL(host=link.host, port=link.port)
+ toUrl = bridge.getBroker().getUrl()
+ found = False
+ for pair in pairs:
+ if pair.matches(fromUrl, toUrl):
+ found = True
+ if not found:
+ pairs.append(RoutePair(fromUrl, toUrl))
+ for pair in pairs:
+ print " %s" % pair
+ print
+
+ print "Static Routes:"
+ bridges = self.qmf.getObjects(_class="bridge", dynamic=False)
+ if len(bridges) == 0:
+ print " none found"
+ print
+
+ for bridge in bridges:
+ link = bridge._linkRef_
+ fromUrl = "%s:%s" % (link.host, link.port)
+ toUrl = bridge.getBroker().getUrl()
+ leftType = "ex"
+ rightType = "ex"
+ if bridge.srcIsLocal:
+ arrow = "=>"
+ left = bridge.src
+ right = bridge.dest
+ if bridge.srcIsQueue:
+ leftType = "queue"
+ else:
+ arrow = "<="
+ left = bridge.dest
+ right = bridge.src
+ if bridge.srcIsQueue:
+ rightType = "queue"
+
+ if bridge.srcIsQueue:
+ print " %s(%s=%s) %s %s(%s=%s)" % \
+ (toUrl, leftType, left, arrow, fromUrl, rightType, right)
+ else:
+ print " %s(%s=%s) %s %s(%s=%s) key=%s" % \
+ (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key)
+ print
+
+ while len(self.brokerList):
+ b = self.brokerList.popitem()
+ if b[0] != self.local.name():
+ self.qmf.delBroker(b[1])
+
+ def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, interbroker_mechanism="", dynamic=False):
+ if dynamic and config._srclocal:
+ raise Exception("--src-local is not permitted on dynamic routes")
+
+ self.addLink(remoteBroker, interbroker_mechanism)
+ link = self.getLink()
+ self.checkLink(link)
+
+ bridges = self.agent.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue:
+ if not config._quiet:
+ raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
+ sys.exit(0)
+
+ if config._verbose:
+ print "Creating inter-broker binding..."
+ res = link.bridge(config._durable, exchange, exchange, routingKey, tag,
+ excludes, False, config._srclocal, dynamic,
+ config._ack, credit=config._credit)
+ if res.status != 0:
+ raise Exception(res.text)
+ if config._verbose:
+ print "Bridge method returned:", res.status, res.text
+
+ def addQueueRoute(self, remoteBroker, interbroker_mechanism, exchange, queue ):
+ self.addLink(remoteBroker, interbroker_mechanism)
+ link = self.getLink()
+ self.checkLink(link)
+
+ bridges = self.agent.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
+ if not config._quiet:
+ raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue))
+ sys.exit(0)
+
+ if config._verbose:
+ print "Creating inter-broker binding..."
+ res = link.bridge(config._durable, queue, exchange, "", "", "", True,
+ config._srclocal, False, config._ack, credit=config._credit)
+ if res.status != 0:
+ raise Exception(res.text)
+ if config._verbose:
+ print "Bridge method returned:", res.status, res.text
+
+ def delQueueRoute(self, remoteBroker, exchange, queue):
+ self.remote = BrokerURL(remoteBroker)
+ link = self.getLink()
+ if link == None:
+ if not config._quiet:
+ raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
+ sys.exit(0)
+
+ bridges = self.agent.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
+ if config._verbose:
+ print "Closing bridge..."
+ res = bridge.close()
+ if res.status != 0:
+ raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
+ if len(bridges) == 1 and config._dellink:
+ link = self.getLink()
+ if link == None:
+ sys.exit(0)
+ if config._verbose:
+ print "Last bridge on link, closing link..."
+ res = link.close()
+ if res.status != 0:
+ raise Exception("Error closing link: %d - %s" % (res.status, res.text))
+ sys.exit(0)
+ if not config._quiet:
+ raise Exception("Route not found")
+
+ def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False):
+ self.remote = BrokerURL(remoteBroker)
+ link = self.getLink()
+ if link == None:
+ if not config._quiet:
+ raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
+ sys.exit(0)
+
+ bridges = self.agent.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \
+ and bridge.dynamic == dynamic:
+ if config._verbose:
+ print "Closing bridge..."
+ res = bridge.close()
+ if res.status != 0:
+ raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
+ if len(bridges) == 1 and config._dellink:
+ link = self.getLink()
+ if link == None:
+ sys.exit(0)
+ if config._verbose:
+ print "Last bridge on link, closing link..."
+ res = link.close()
+ if res.status != 0:
+ raise Exception("Error closing link: %d - %s" % (res.status, res.text))
+ return
+ if not config._quiet:
+ raise Exception("Route not found")
+
+ def listRoutes(self):
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
+
+ for bridge in bridges:
+ myLink = None
+ for link in links:
+ if bridge.linkRef == link.getObjectId():
+ myLink = link
+ break
+ if myLink != None:
+ if bridge.dynamic:
+ keyText = "<dynamic>"
+ else:
+ keyText = bridge.key
+ print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText)
+
+ def clearAllRoutes(self):
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
+
+ for bridge in bridges:
+ if config._verbose:
+ myLink = None
+ for link in links:
+ if bridge.linkRef == link.getObjectId():
+ myLink = link
+ break
+ if myLink != None:
+ print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key),
+ res = bridge.close()
+ if res.status != 0:
+ print "Error: %d - %s" % (res.status, res.text)
+ elif config._verbose:
+ print "Ok"
+
+ if config._dellink:
+ links = self.qmf.getObjects(_class="link")
+ for link in links:
+ if config._verbose:
+ print "Deleting Link: %s:%d... " % (link.host, link.port),
+ res = link.close()
+ if res.status != 0:
+ print "Error: %d - %s" % (res.status, res.text)
+ elif config._verbose:
+ print "Ok"
+
+class RoutePair:
+ def __init__(self, fromUrl, toUrl):
+ self.fromUrl = fromUrl
+ self.toUrl = toUrl
+ self.bidir = False
+
+ def __repr__(self):
+ if self.bidir:
+ delimit = "<=>"
+ else:
+ delimit = " =>"
+ return "%s %s %s" % (self.fromUrl, delimit, self.toUrl)
+
+ def matches(self, fromUrl, toUrl):
+ if fromUrl == self.fromUrl and toUrl == self.toUrl:
+ return True
+ if toUrl == self.fromUrl and fromUrl == self.toUrl:
+ self.bidir = True
+ return True
+ return False
+
+
+def YN(val):
+ if val == 1:
+ return 'Y'
+ return 'N'
+
+
+def main(argv=None):
+
+ args = OptionsAndArguments(argv)
+ nargs = len(args)
+ if nargs < 2:
+ Usage()
+ return(-1)
+
+ if nargs == 2:
+ localBroker = "localhost"
+ else:
+ if config._srclocal:
+ localBroker = args[3]
+ remoteBroker = args[2]
+ else:
+ localBroker = args[2]
+ if nargs > 3:
+ remoteBroker = args[3]
+
+ group = args[0]
+ cmd = args[1]
+
+ rm = None
+ try:
+ rm = RouteManager(localBroker)
+ if group == "link":
+ if cmd == "add":
+ if nargs < 3 or nargs > 5:
+ Usage()
+ return(-1)
+ interbroker_mechanism = ""
+ if nargs > 4: interbroker_mechanism = args[4]
+ rm.addLink(remoteBroker, interbroker_mechanism)
+ rm.checkLink(rm.getLink())
+ elif cmd == "del":
+ if nargs != 4:
+ Usage()
+ return(-1)
+ rm.delLink(remoteBroker)
+ elif cmd == "list":
+ rm.listLinks()
+
+ elif group == "dynamic":
+ if cmd == "add":
+ if nargs < 5 or nargs > 8:
+ Usage()
+ return(-1)
+
+ tag = ""
+ excludes = ""
+ interbroker_mechanism = ""
+ if nargs > 5: tag = args[5]
+ if nargs > 6: excludes = args[6]
+ if nargs > 7: interbroker_mechanism = args[7]
+ rm.addRoute(remoteBroker, args[4], "", tag, excludes, interbroker_mechanism, dynamic=True)
+ elif cmd == "del":
+ if nargs != 5:
+ Usage()
+ return(-1)
+ else:
+ rm.delRoute(remoteBroker, args[4], "", dynamic=True)
+
+ elif group == "route":
+ if cmd == "add":
+ if nargs < 6 or nargs > 9:
+ Usage()
+ return(-1)
+
+ tag = ""
+ excludes = ""
+ interbroker_mechanism = ""
+ if nargs > 6: tag = args[6]
+ if nargs > 7: excludes = args[7]
+ if nargs > 8: interbroker_mechanism = args[8]
+ rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, interbroker_mechanism, dynamic=False)
+ elif cmd == "del":
+ if nargs != 6:
+ Usage()
+ return(-1)
+ rm.delRoute(remoteBroker, args[4], args[5], dynamic=False)
+ elif cmd == "map":
+ rm.mapRoutes()
+ else:
+ if cmd == "list":
+ rm.listRoutes()
+ elif cmd == "flush":
+ rm.clearAllRoutes()
+ else:
+ Usage()
+ return(-1)
+
+ elif group == "queue":
+ if nargs < 6 or nargs > 7:
+ Usage()
+ return(-1)
+ if cmd == "add":
+ interbroker_mechanism = ""
+ if nargs > 6: interbroker_mechanism = args[6]
+ rm.addQueueRoute(remoteBroker, interbroker_mechanism, exchange=args[4], queue=args[5] )
+ elif cmd == "del":
+ rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5])
+ else:
+ Usage()
+ return(-1)
+ else:
+ Usage()
+ return(-1)
+
+ except Exception,e:
+ if rm:
+ rm.disconnect() # try to release broker resources
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ return 1
+
+ rm.disconnect()
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())