diff options
-rwxr-xr-x | python/commands/qpid-route | 294 |
1 files changed, 182 insertions, 112 deletions
diff --git a/python/commands/qpid-route b/python/commands/qpid-route index 47eeef3ff2..e64889b2af 100755 --- a/python/commands/qpid-route +++ b/python/commands/qpid-route @@ -25,80 +25,81 @@ import socket import os from qpid import qmfconsole -def Usage (): - print "Usage: qpid-route [OPTIONS] link add <dest-broker> <src-broker>" - print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>" - print " qpid-route [OPTIONS] link list [<dest-broker>]" - print " qpid-route [OPTIONS] link map [<broker>]" +def Usage(): + print "Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]" + print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>" print print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list]" print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>" + print " qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue>" + print " qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>" print " qpid-route [OPTIONS] route list [<dest-broker>]" print " qpid-route [OPTIONS] route flush [<dest-broker>]" - print " qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]" - print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>" + print " qpid-route [OPTIONS] route map [<broker>]" + print + print " qpid-route [OPTIONS] link add <dest-broker> <src-broker>" + print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>" + print " qpid-route [OPTIONS] link list [<dest-broker>]" print print "Options:" print " -v [ --verbose ] Verbose output" print " -q [ --quiet ] Quiet output, don't print duplicate warnings" print " -d [ --durable ] Added configuration shall be durable" print " -e [ --del-empty-link ] Delete link after deleting last route on the link" + print " -s [ --src-local ] Make connection to source broker (push route)" print " -t <transport> [ --transport <transport>]" print " Specify transport to use for links, defaults to tcp" print print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:<port>]" print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print - sys.exit (1) + sys.exit(1) -_verbose = False -_quiet = False -_durable = False -_dellink = False +_verbose = False +_quiet = False +_durable = False +_dellink = False +_srclocal = False _transport = "tcp" class RouteManager: - def __init__ (self, destBroker): - self.dest = qmfconsole.BrokerURL(destBroker) - self.src = None + def __init__(self, localBroker): + self.local = qmfconsole.BrokerURL(localBroker) + self.remote = None self.qmf = qmfconsole.Session() - self.broker = self.qmf.addBroker(destBroker) + self.broker = self.qmf.addBroker(localBroker) - def Disconnect (self): + def disconnect(self): self.qmf.delBroker(self.broker) - def getLink (self): + def getLink(self): links = self.qmf.getObjects(_class="link") for link in links: - if self.src.match(link.host, link.port): + if self.remote.match(link.host, link.port): return link return None - def AddLink (self, srcBroker): - self.src = qmfconsole.BrokerURL(srcBroker) - if self.dest.match(self.src.host, self.src.port): - print "Linking broker to itself is not permitted" - sys.exit(1) + def addLink(self, remoteBroker): + self.remote = qmfconsole.BrokerURL(remoteBroker) + if self.local.match(self.remote.host, self.remote.port): + raise Exception("Linking broker to itself is not permitted") brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() - if link != None: - raise Exception("Link already exists") - - if self.src.authName == "anonymous": - mech = "ANONYMOUS" - else: - mech = "PLAIN" - res = broker.connect(self.src.host, self.src.port, _durable, - mech, self.src.authName, self.src.authPass, - _transport) - if _verbose: - print "Connect method returned:", res.status, res.text - link = self.getLink() + if link == None: + if self.remote.authName == "anonymous": + mech = "ANONYMOUS" + else: + mech = "PLAIN" + res = broker.connect(self.remote.host, self.remote.port, _durable, + mech, self.remote.authName, self.remote.authPass, + _transport) + if _verbose: + print "Connect method returned:", res.status, res.text - def DelLink (self, srcBroker): - self.src = qmfconsole.BrokerURL(srcBroker) + def delLink(self, remoteBroker): + self.remote = qmfconsole.BrokerURL(remoteBroker) brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() @@ -109,7 +110,7 @@ class RouteManager: if _verbose: print "Close method returned:", res.status, res.text - def ListLinks (self): + def listLinks(self): links = self.qmf.getObjects(_class="link") if len(links) == 0: print "No Links Found" @@ -121,14 +122,14 @@ class RouteManager: print "%-16s%-8d %c %-18s%s" % \ (link.host, link.port, YN(link.durable), link.state, link.lastError) - def MapLinks(self): + def mapRoutes(self): qmf = self.qmf print print "Finding Linked Brokers:" brokerList = {} - brokerList[self.dest.name()] = self.broker - print " %s... Ok" % self.dest + brokerList[self.local.name()] = self.broker + print " %s... Ok" % self.local added = True while added: @@ -155,8 +156,7 @@ class RouteManager: fedExchanges.append(bridge.src) if len(fedExchanges) == 0: print " none found" - else: - print + print for ex in fedExchanges: print " Exchange %s:" % ex @@ -180,71 +180,124 @@ class RouteManager: bridges = qmf.getObjects(_class="bridge", dynamic=False) if len(bridges) == 0: print " none found" - else: - print + print for bridge in bridges: link = bridge._linkRef_ fromUrl = "%s:%s" % (link.host, link.port) toUrl = bridge.getBroker().getUrl() - print " %s(%s) <= %s(%s) key=%s" % (toUrl, bridge.dest, fromUrl, bridge.src, bridge.key) + leftType = "ex" + rightType = "ex" + if bridge.srcIsLocal: + arrow = "=>" + left = bridge.src + right = bridge.dest + if bridge.srcIsQueue: + leftType = "queue" + else: + arrow = "<=" + left = bridge.dest + right = bridge.src + if bridge.srcIsQueue: + rightType = "queue" + + if bridge.srcIsQueue: + print " %s(%s=%s) %s %s(%s=%s)" % \ + (toUrl, leftType, left, arrow, fromUrl, rightType, right) + else: + print " %s(%s=%s) %s %s(%s=%s) key=%s" % \ + (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key) print for broker in brokerList: - if broker != self.dest.name(): + if broker != self.local.name(): qmf.delBroker(brokerList[broker]) - def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes, dynamic=False): - self.src = qmfconsole.BrokerURL(srcBroker) - if self.dest.match(self.src.host, self.src.port): - raise Exception("Linking broker to itself is not permitted") - - brokers = self.qmf.getObjects(_class="broker") - broker = brokers[0] + def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False): + if dynamic and _srclocal: + raise Exception("--src-local is not permitted on dynamic routes") + self.addLink(remoteBroker) link = self.getLink() if link == None: - if _verbose: - print "Inter-broker link not found, creating..." + raise Exception("Link failed to create") - if self.src.authName == "anonymous": - mech = "ANONYMOUS" - else: - mech = "PLAIN" - res = broker.connect(self.src.host, self.src.port, _durable, - mech, self.src.authName, self.src.authPass, - _transport) - if _verbose: - print "Connect method returned:", res.status, res.text - link = self.getLink() + bridges = self.qmf.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue: + if not _quiet: + raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) + sys.exit(0) + if _verbose: + print "Creating inter-broker binding..." + res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic) + if res.status != 0: + raise Exception(res.text) + if _verbose: + print "Bridge method returned:", res.status, res.text + + def addQueueRoute(self, remoteBroker, exchange, queue): + self.addLink(remoteBroker) + link = self.getLink() if link == None: - raise Exception("Protocol Error - Missing link ID") + raise Exception("Link failed to create") bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if bridge.linkRef == link.getObjectId() and \ - bridge.dest == exchange and bridge.key == routingKey: + bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: if not _quiet: - raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) - sys.exit (0) + raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue)) + sys.exit(0) if _verbose: print "Creating inter-broker binding..." - res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, False, dynamic) + res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False) if res.status != 0: raise Exception(res.text) if _verbose: print "Bridge method returned:", res.status, res.text - def DelRoute (self, srcBroker, exchange, routingKey, dynamic=False): - self.src = qmfconsole.BrokerURL(srcBroker) + def delQueueRoute(self, remoteBroker, exchange, queue): + self.remote = qmfconsole.BrokerURL(remoteBroker) link = self.getLink() if link == None: if not _quiet: - raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name())) - sys.exit (0) + raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) + sys.exit(0) + + bridges = self.qmf.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: + if _verbose: + print "Closing bridge..." + res = bridge.close() + if res.status != 0: + raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) + if len(bridges) == 1 and _dellink: + link = self.getLink() + if link == None: + sys.exit(0) + if _verbose: + print "Last bridge on link, closing link..." + res = link.close() + if res.status != 0: + raise Exception("Error closing link: %d - %s" % (res.status, res.text)) + sys.exit(0) + if not _quiet: + raise Exception("Route not found") + + def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False): + self.remote = qmfconsole.BrokerURL(remoteBroker) + link = self.getLink() + if link == None: + if not _quiet: + raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) + sys.exit(0) bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: @@ -255,20 +308,20 @@ class RouteManager: res = bridge.close() if res.status != 0: raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) - if len (bridges) == 1 and _dellink: - link = self.getLink () + if len(bridges) == 1 and _dellink: + link = self.getLink() if link == None: - sys.exit (0) + sys.exit(0) if _verbose: print "Last bridge on link, closing link..." res = link.close() if res.status != 0: raise Exception("Error closing link: %d - %s" % (res.status, res.text)) - sys.exit (0) + sys.exit(0) if not _quiet: raise Exception("Route not found") - def ListRoutes (self): + def listRoutes(self): links = self.qmf.getObjects(_class="link") bridges = self.qmf.getObjects(_class="bridge") @@ -283,9 +336,9 @@ class RouteManager: keyText = "<dynamic>" else: keyText = bridge.key - print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, keyText) + print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText) - def ClearAllRoutes (self): + def clearAllRoutes(self): links = self.qmf.getObjects(_class="link") bridges = self.qmf.getObjects(_class="bridge") @@ -347,10 +400,10 @@ def YN(val): ## try: - longOpts = ("verbose", "quiet", "durable", "del-empty-link", "transport=") - (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "vqdet:", longOpts) + longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=") + (optlist, cargs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts) except: - Usage () + Usage() for opt in optlist: if opt[0] == "-v" or opt[0] == "--verbose": @@ -361,77 +414,94 @@ for opt in optlist: _durable = True if opt[0] == "-e" or opt[0] == "--del-empty-link": _dellink = True + if opt[0] == "-s" or opt[0] == "--src-local": + _srclocal = True if opt[0] == "-t" or opt[0] == "--transport": _transport = opt[1] -nargs = len (cargs) +nargs = len(cargs) if nargs < 2: - Usage () + Usage() if nargs == 2: - destBroker = "localhost" + localBroker = "localhost" else: - destBroker = cargs[2] + if _srclocal: + localBroker = cargs[3] + remoteBroker = cargs[2] + else: + localBroker = cargs[2] + if nargs > 3: + remoteBroker = cargs[3] group = cargs[0] cmd = cargs[1] try: - rm = RouteManager (destBroker) + rm = RouteManager(localBroker) if group == "link": if cmd == "add": if nargs != 4: Usage() - rm.AddLink (cargs[3]) + rm.addLink(remoteBroker) elif cmd == "del": if nargs != 4: Usage() - rm.DelLink (cargs[3]) + rm.delLink(remoteBroker) elif cmd == "list": - rm.ListLinks() - elif cmd == "map": - rm.MapLinks() + rm.listLinks() elif group == "dynamic": if cmd == "add": if nargs < 5 or nargs > 7: - Usage () + Usage() tag = "" excludes = "" if nargs > 5: tag = cargs[5] if nargs > 6: excludes = cargs[6] - rm.AddRoute (cargs[3], cargs[4], "", tag, excludes, dynamic=True) + rm.addRoute(remoteBroker, cargs[4], "", tag, excludes, dynamic=True) elif cmd == "del": if nargs != 5: - Usage () + Usage() else: - rm.DelRoute (cargs[3], cargs[4], "", dynamic=True) + rm.delRoute(remoteBroker, cargs[4], "", dynamic=True) elif group == "route": if cmd == "add": if nargs < 6 or nargs > 8: - Usage () + Usage() tag = "" excludes = "" if nargs > 6: tag = cargs[6] - if nargs > 7: excludes = cargs[7] - rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes, dynamic=False) + if nargs > 7: excludes = cargs[7] + rm.addRoute(remoteBroker, cargs[4], cargs[5], tag, excludes, dynamic=False) elif cmd == "del": if nargs != 6: - Usage () - else: - rm.DelRoute (cargs[3], cargs[4], cargs[5], dynamic=False) + Usage() + rm.delRoute(remoteBroker, cargs[4], cargs[5], dynamic=False) + elif cmd == "map": + rm.mapRoutes() else: - if cmd == "list": - rm.ListRoutes () + if cmd == "list": + rm.listRoutes() elif cmd == "flush": - rm.ClearAllRoutes () + rm.clearAllRoutes() else: - Usage () + Usage() + + elif group == "queue": + if nargs != 6: + Usage() + if cmd == "add": + rm.addQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) + elif cmd == "del": + rm.delQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) + else: + Usage() except Exception,e: print "Failed:", e.args[0] sys.exit(1) -rm.Disconnect () +rm.disconnect() |