diff options
author | Nuno Santos <nsantos@apache.org> | 2008-03-28 20:53:44 +0000 |
---|---|---|
committer | Nuno Santos <nsantos@apache.org> | 2008-03-28 20:53:44 +0000 |
commit | 10253f694e4ad13ea4aff59ae712513b34d789b6 (patch) | |
tree | b25d1c5f2dfb542aed77e45bd7339f7607f3d213 | |
parent | d09f88902502a8ef020dcfedaf1d2b868c8ee077 (diff) | |
download | qpid-python-10253f694e4ad13ea4aff59ae712513b34d789b6.tar.gz |
QPID-885: patch from Ted Ross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@642375 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | python/commands/qpid-config | 342 |
1 files changed, 342 insertions, 0 deletions
diff --git a/python/commands/qpid-config b/python/commands/qpid-config new file mode 100755 index 0000000000..be0fc5a67f --- /dev/null +++ b/python/commands/qpid-config @@ -0,0 +1,342 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import socket +import qpid +from threading import Condition +from qpid.management import managementClient +from qpid.peer import Closed +from qpid.client import Client +from time import sleep + +defspecpath = "/usr/share/amqp/amqp.0-10-preview.xml" +specpath = defspecpath +recursive = False +host = "localhost" + +def Usage (): + print "Usage: qpid-config [OPTIONS]" + print " qpid-config [OPTIONS] exchanges [filter-string]" + print " qpid-config [OPTIONS] queues [filter-string]" + print " qpid-config [OPTIONS] add exchange <type> <name> [durable]" + print " qpid-config [OPTIONS] del exchange <name>" + print " qpid-config [OPTIONS] add queue <name> [durable]" + print " qpid-config [OPTIONS] del queue <name>" + print " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]" + print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]" + print + print "Options:" + print " -b show bindings" + print " -a <broker-addr> default: localhost" + print " broker-addr is in the form: hostname | ip-address [:<port>]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000" + print " -s <amqp-spec-file> default:", defspecpath + print + sys.exit (1) + +class Broker: + def __init__ (self, text): + colon = text.find (":") + if colon == -1: + host = text + self.port = 5672 + else: + host = text[:colon] + self.port = int (text[colon+1:]) + self.host = socket.gethostbyname (host) + + def name (self): + return self.host + ":" + str (self.port) + +class BrokerManager: + def __init__ (self): + self.dest = None + self.src = None + self.broker = None + + def SetBroker (self, broker): + self.broker = broker + + def ConnectToBroker (self): + try: + self.spec = qpid.spec.load (specpath) + self.client = Client (self.broker.host, self.broker.port, self.spec) + self.client.start ({"LOGIN":"guest","PASSWORD":"guest"}) + self.channel = self.client.channel (1) + self.mclient = managementClient (self.spec) + self.mchannel = self.mclient.addChannel (self.channel) + except socket.error, e: + print "Connect Error:", e + exit (1) + + def Overview (self): + self.ConnectToBroker () + mc = self.mclient + mch = self.mchannel + mc.syncWaitForStable (mch) + exchanges = mc.syncGetObjects (mch, "exchange") + queues = mc.syncGetObjects (mch, "queue") + print "Total Exchanges: %d" % len (exchanges) + etype = {} + for ex in exchanges: + if ex.type not in etype: + etype[ex.type] = 1 + else: + etype[ex.type] = etype[ex.type] + 1 + for typ in etype: + print "%15s: %d" % (typ, etype[typ]) + + print + print " Total Queues: %d" % len (queues) + durable = 0 + for queue in queues: + if queue.durable: + durable = durable + 1 + print " durable: %d" % durable + print " non-durable: %d" % (len (queues) - durable) + + def ExchangeList (self, filter): + self.ConnectToBroker () + mc = self.mclient + mch = self.mchannel + mc.syncWaitForStable (mch) + exchanges = mc.syncGetObjects (mch, "exchange") + print "Type Bindings Exchange Name" + print "=============================================" + for ex in exchanges: + if self.match (ex.name, filter): + print "%-10s%5d %s" % (ex.type, ex.bindings, ex.name) + + def ExchangeListRecurse (self, filter): + self.ConnectToBroker () + mc = self.mclient + mch = self.mchannel + mc.syncWaitForStable (mch) + exchanges = mc.syncGetObjects (mch, "exchange") + bindings = mc.syncGetObjects (mch, "binding") + queues = mc.syncGetObjects (mch, "queue") + for ex in exchanges: + if self.match (ex.name, filter): + print "Exchange '%s' (%s)" % (ex.name, ex.type) + for bind in bindings: + if bind.exchangeRef == ex.id: + qname = "<unknown>" + queue = self.findById (queues, bind.queueRef) + if queue != None: + qname = queue.name + print " bind [%s] => %s" % (bind.bindingKey, qname) + + + def QueueList (self, filter): + self.ConnectToBroker () + mc = self.mclient + mch = self.mchannel + mc.syncWaitForStable (mch) + queues = mc.syncGetObjects (mch, "queue") + print "Durable AutoDel Excl Bindings Queue Name" + print "===============================================================" + for q in queues: + if self.match (q.name, filter): + print "%4c%9c%7c%10d %s" % (tf (q.durable), tf (q.autoDelete), tf (q.exclusive), + q.bindings, q.name) + + def QueueListRecurse (self, filter): + self.ConnectToBroker () + mc = self.mclient + mch = self.mchannel + mc.syncWaitForStable (mch) + exchanges = mc.syncGetObjects (mch, "exchange") + bindings = mc.syncGetObjects (mch, "binding") + queues = mc.syncGetObjects (mch, "queue") + for queue in queues: + if self.match (queue.name, filter): + print "Queue '%s'" % queue.name + for bind in bindings: + if bind.queueRef == queue.id: + ename = "<unknown>" + ex = self.findById (exchanges, bind.exchangeRef) + if ex != None: + ename = ex.name + if ename == "": + ename = "''" + print " bind [%s] => %s" % (bind.bindingKey, ename) + + def AddExchange (self, args): + if len (args) < 2: + Usage () + self.ConnectToBroker () + etype = args[0] + ename = args[1] + _durable = False + if len (args) > 2 and args[2] == "durable": + _durable = True + + try: + self.channel.exchange_declare (exchange=ename, type=etype, durable=_durable) + except Closed, e: + print "Failed:", e + + def DelExchange (self, args): + if len (args) < 1: + Usage () + self.ConnectToBroker () + ename = args[0] + + try: + self.channel.exchange_delete (exchange=ename) + except Closed, e: + print "Failed:", e + + def AddQueue (self, args): + if len (args) < 1: + Usage () + self.ConnectToBroker () + qname = args[0] + _durable = False + if len (args) > 1 and args[1] == "durable": + _durable = True + + try: + self.channel.queue_declare (queue=qname, durable=_durable) + except Closed, e: + print "Failed:", e + + def DelQueue (self, args): + if len (args) < 1: + Usage () + self.ConnectToBroker () + qname = args[0] + + try: + self.channel.queue_delete (queue=qname) + except Closed, e: + print "Failed:", e + + def Bind (self, args): + if len (args) < 2: + Usage () + self.ConnectToBroker () + ename = args[0] + qname = args[1] + key = "" + if len (args) > 2: + key = args[2] + + try: + self.channel.queue_bind (queue=qname, exchange=ename, routing_key=key) + except Closed, e: + print "Failed:", e + + def Unbind (self, args): + if len (args) < 2: + Usage () + self.ConnectToBroker () + ename = args[0] + qname = args[1] + key = "" + if len (args) > 2: + key = args[2] + + try: + self.channel.queue_unbind (queue=qname, exchange=ename, routing_key=key) + except Closed, e: + print "Failed:", e + + def findById (self, items, id): + for item in items: + if item.id == id: + return item + return None + + def match (self, name, filter): + if filter == "": + return True + if name.find (filter) == -1: + return False + return True + +def tf (bool): + if bool: + return 'Y' + return 'N' + +## +## Main Program +## + +try: + (optlist, cargs) = getopt.getopt (sys.argv[1:], "s:a:b") +except: + Usage () + +for opt in optlist: + if opt[0] == "-s": + specpath = opt[1] + if opt[0] == "-b": + recursive = True + if opt[0] == "-a": + host = opt[1] + +nargs = len (cargs) +bm = BrokerManager () +bm.SetBroker (Broker (host)) + +if nargs == 0: + bm.Overview () +else: + cmd = cargs[0] + modifier = "" + if nargs > 1: + modifier = cargs[1] + if cmd[0] == 'e': + if recursive: + bm.ExchangeListRecurse (modifier) + else: + bm.ExchangeList (modifier) + elif cmd[0] == 'q': + if recursive: + bm.QueueListRecurse (modifier) + else: + bm.QueueList (modifier) + elif cmd == "add": + if modifier == "exchange": + bm.AddExchange (cargs[2:]) + elif modifier == "queue": + bm.AddQueue (cargs[2:]) + else: + Usage () + elif cmd == "del": + if modifier == "exchange": + bm.DelExchange (cargs[2:]) + elif modifier == "queue": + bm.DelQueue (cargs[2:]) + else: + Usage () + elif cmd == "bind": + bm.Bind (cargs[1:]) + elif cmd == "unbind": + bm.Unbind (cargs[1:]) + else: + Usage () + |