diff options
Diffstat (limited to 'trunk/qpid/python/commands/qpid-config')
-rwxr-xr-x | trunk/qpid/python/commands/qpid-config | 495 |
1 files changed, 0 insertions, 495 deletions
diff --git a/trunk/qpid/python/commands/qpid-config b/trunk/qpid/python/commands/qpid-config deleted file mode 100755 index 39af67f39c..0000000000 --- a/trunk/qpid/python/commands/qpid-config +++ /dev/null @@ -1,495 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os -import getopt -import sys -import locale -from qmf.console import Session - -_recursive = False -_host = "localhost" -_connTimeout = 10 -_altern_ex = None -_passive = False -_durable = False -_clusterDurable = False -_if_empty = True -_if_unused = True -_fileCount = 8 -_fileSize = 24 -_maxQueueSize = None -_maxQueueCount = None -_limitPolicy = None -_order = None -_msgSequence = False -_ive = False -_eventGeneration = None - -FILECOUNT = "qpid.file_count" -FILESIZE = "qpid.file_size" -MAX_QUEUE_SIZE = "qpid.max_size" -MAX_QUEUE_COUNT = "qpid.max_count" -POLICY_TYPE = "qpid.policy_type" -CLUSTER_DURABLE = "qpid.persist_last_node" -LVQ = "qpid.last_value_queue" -LVQNB = "qpid.last_value_queue_no_browse" -MSG_SEQUENCE = "qpid.msg_sequence" -IVE = "qpid.ive" -QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" - -def Usage (): - print "Usage: qpid-config [OPTIONS]" - print " qpid-config [OPTIONS] exchanges [filter-string]" - print " qpid-config [OPTIONS] queues [filter-string]" - print " qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]" - print " qpid-config [OPTIONS] del exchange <name>" - print " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]" - print " qpid-config [OPTIONS] del queue <name> [DelQueueOptions]" - print " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]" - print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]" - print - print "Options:" - print " --timeout seconds (10) Maximum time to wait for broker connection" - print " -b [ --bindings ] Show bindings in queue or exchange list" - print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker" - print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" - print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" - print - print "Add Queue Options:" - print " --alternate-exchange [name of the alternate exchange]" - print " The alternate-exchange field specifies how messages on this queue should" - print " be treated when they are rejected by a subscriber, or when they are" - print " orphaned by queue deletion. When present, rejected or orphaned messages" - print " MUST be routed to the alternate-exchange. In all cases the messages MUST" - print " be removed from the queue." - print " --passive Do not actually change the broker state (queue will not be created)" - print " --durable Queue is durable" - print " --cluster-durable Queue becomes durable if there is only one functioning cluster node" - print " --file-count N (8) Number of files in queue's persistence journal" - print " --file-size N (24) File size in pages (64Kib/page)" - print " --max-queue-size N Maximum in-memory queue size as bytes" - print " --max-queue-count N Maximum in-memory queue size as a number of messages" - print " --limit-policy [none | reject | flow-to-disk | ring | ring-strict]" - print " Action taken when queue limit is reached:" - print " none (default) - Use broker's default policy" - print " reject - Reject enqueued messages" - print " flow-to-disk - Page messages to disk" - print " ring - Replace oldest unacquired message with new" - print " ring-strict - Replace oldest message, reject if oldest is acquired" - print " --order [fifo | lvq | lvq-no-browse]" - print " Set queue ordering policy:" - print " fifo (default) - First in, first out" - print " lvq - Last Value Queue ordering, allows queue browsing" - print " lvq-no-browse - Last Value Queue ordering, browsing clients may lose data" - print " --generate-queue-events N" - print " If set to 1, every enqueue will generate an event that can be processed by" - print " registered listeners (e.g. for replication). If set to 2, events will be" - print " generated for enqueues and dequeues" - print - print "Del Queue Options:" - print " --force Force delete of queue even if it's currently used or it's not empty" - print " --force-if-not-empty Force delete of queue even if it's not empty" - print " --force-if-used Force delete of queue even if it's currently used" - print - print "Add Exchange <type> values:" - print " direct Direct exchange for point-to-point communication" - print " fanout Fanout exchange for broadcast communication" - print " topic Topic exchange that routes messages using binding keys with wildcards" - print " headers Headers exchange that matches header fields against the binding keys" - print - print "Add Exchange Options:" - print " --alternate-exchange [name of the alternate exchange]" - print " In the event that a message cannot be routed, this is the name of the exchange to" - print " which the message will be sent. Messages transferred using message.transfer will" - print " be routed to the alternate-exchange only if they are sent with the \"none\"" - print " accept-mode, and the discard-unroutable delivery property is set to false, and" - print " there is no queue to route to for the given message according to the bindings" - print " on this exchange." - print " --passive Do not actually change the broker state (exchange will not be created)" - print " --durable Exchange is durable" - print " --sequence Exchange will insert a 'qpid.msg_sequence' field in the message header" - print " with a value that increments for each message forwarded." - print " --ive Exchange will behave as an 'initial-value-exchange', keeping a reference" - print " to the last message forwarded and enqueuing that message to newly bound" - print " queues." - print - sys.exit (1) - -class BrokerManager: - def __init__ (self): - self.brokerName = None - self.qmf = None - self.broker = None - - def SetBroker (self, brokerUrl): - self.url = brokerUrl - self.qmf = Session() - self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == 0: - self.brokerAgent = a - - def Disconnect(self): - if self.broker: - self.qmf.delBroker(self.broker) - - def Overview (self): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) - print "Total Exchanges: %d" % len (exchanges) - etype = {} - for ex in exchanges: - if ex.type not in etype: - etype[ex.type] = 1 - else: - etype[ex.type] = etype[ex.type] + 1 - for typ in etype: - print "%15s: %d" % (typ, etype[typ]) - - print - print " Total Queues: %d" % len (queues) - _durable = 0 - for queue in queues: - if queue.durable: - _durable = _durable + 1 - print " durable: %d" % _durable - print " non-durable: %d" % (len (queues) - _durable) - - def ExchangeList (self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - caption1 = "Type " - caption2 = "Exchange Name" - maxNameLen = len(caption2) - for ex in exchanges: - if self.match(ex.name, filter): - if len(ex.name) > maxNameLen: maxNameLen = len(ex.name) - print "%s%-*s Attributes" % (caption1, maxNameLen, caption2) - line = "" - for i in range(((maxNameLen + len(caption1)) / 5) + 5): - line += "=====" - print line - - for ex in exchanges: - if self.match (ex.name, filter): - print "%-10s%-*s " % (ex.type, maxNameLen, ex.name), - args = ex.arguments - if ex.durable: print "--durable", - if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", - if IVE in args and args[IVE] == 1: print "--ive", - if ex.altExchange: - print "--alternate-exchange=%s" % ex._altExchange_.name, - print - - def ExchangeListRecurse (self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) - for ex in exchanges: - if self.match (ex.name, filter): - print "Exchange '%s' (%s)" % (ex.name, ex.type) - for bind in bindings: - if bind.exchangeRef == ex.getObjectId(): - qname = "<unknown>" - queue = self.findById (queues, bind.queueRef) - if queue != None: - qname = queue.name - print " bind [%s] => %s" % (bind.bindingKey, qname) - - - def QueueList (self, filter): - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) - - caption = "Queue Name" - maxNameLen = len(caption) - for q in queues: - if self.match (q.name, filter): - if len(q.name) > maxNameLen: maxNameLen = len(q.name) - print "%-*s Attributes" % (maxNameLen, caption) - line = "" - for i in range((maxNameLen / 5) + 5): - line += "=====" - print line - - for q in queues: - if self.match (q.name, filter): - print "%-*s " % (maxNameLen, q.name), - args = q.arguments - if q.durable: print "--durable", - if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable", - if q.autoDelete: print "auto-del", - if q.exclusive: print "excl", - if FILESIZE in args: print "--file-size=%d" % args[FILESIZE], - if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT], - if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE], - if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT], - if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"), - if LVQ in args and args[LVQ] == 1: print "--order lvq", - if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse", - if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION], - if q.altExchange: - print "--alternate-exchange=%s" % q._altExchange_.name, - print - - def QueueListRecurse (self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) - for queue in queues: - if self.match (queue.name, filter): - print "Queue '%s'" % queue.name - for bind in bindings: - if bind.queueRef == queue.getObjectId(): - ename = "<unknown>" - ex = self.findById (exchanges, bind.exchangeRef) - if ex != None: - ename = ex.name - if ename == "": - ename = "''" - print " bind [%s] => %s" % (bind.bindingKey, ename) - - def AddExchange (self, args): - if len (args) < 2: - Usage () - etype = args[0] - ename = args[1] - declArgs = {} - if _msgSequence: - declArgs[MSG_SEQUENCE] = 1 - if _ive: - declArgs[IVE] = 1 - if _altern_ex != None: - self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs) - else: - self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, passive=_passive, durable=_durable, arguments=declArgs) - - def DelExchange (self, args): - if len (args) < 1: - Usage () - ename = args[0] - self.broker.getAmqpSession().exchange_delete (exchange=ename) - - def AddQueue (self, args): - if len (args) < 1: - Usage () - qname = args[0] - declArgs = {} - if _durable: - declArgs[FILECOUNT] = _fileCount - declArgs[FILESIZE] = _fileSize - - if _maxQueueSize: - declArgs[MAX_QUEUE_SIZE] = _maxQueueSize - if _maxQueueCount: - declArgs[MAX_QUEUE_COUNT] = _maxQueueCount - if _limitPolicy: - if _limitPolicy == "none": - pass - elif _limitPolicy == "reject": - declArgs[POLICY_TYPE] = "reject" - elif _limitPolicy == "flow-to-disk": - declArgs[POLICY_TYPE] = "flow_to_disk" - elif _limitPolicy == "ring": - declArgs[POLICY_TYPE] = "ring" - elif _limitPolicy == "ring-strict": - declArgs[POLICY_TYPE] = "ring_strict" - - if _clusterDurable: - declArgs[CLUSTER_DURABLE] = 1 - if _order: - if _order == "fifo": - pass - elif _order == "lvq": - declArgs[LVQ] = 1 - elif _order == "lvq-no-browse": - declArgs[LVQNB] = 1 - if _eventGeneration: - declArgs[QUEUE_EVENT_GENERATION] = _eventGeneration - - if _altern_ex != None: - self.broker.getAmqpSession().queue_declare (queue=qname, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs) - else: - self.broker.getAmqpSession().queue_declare (queue=qname, passive=_passive, durable=_durable, arguments=declArgs) - - def DelQueue (self, args): - if len (args) < 1: - Usage () - qname = args[0] - self.broker.getAmqpSession().queue_delete (queue=qname, if_empty=_if_empty, if_unused=_if_unused) - - def Bind (self, args): - if len (args) < 2: - Usage () - ename = args[0] - qname = args[1] - key = "" - if len (args) > 2: - key = args[2] - self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key) - - def Unbind (self, args): - if len (args) < 2: - Usage () - ename = args[0] - qname = args[1] - key = "" - if len (args) > 2: - key = args[2] - self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key) - - def findById (self, items, id): - for item in items: - if item.getObjectId() == id: - return item - return None - - def match (self, name, filter): - if filter == "": - return True - if name.find (filter) == -1: - return False - return True - -def YN (bool): - if bool: - return 'Y' - return 'N' - - -## -## Main Program -## - -try: - longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=", - "file-size=", "max-queue-size=", "max-queue-count=", "limit-policy=", - "order=", "sequence", "ive", "generate-queue-events=", "force", "force-if-not-empty", - "force_if_used", "alternate-exchange=", "passive", "timeout=") - (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts) -except: - Usage () - -try: - encoding = locale.getpreferredencoding() - cargs = [a.decode(encoding) for a in encArgs] -except: - cargs = encArgs - -for opt in optlist: - if opt[0] == "-b" or opt[0] == "--bindings": - _recursive = True - if opt[0] == "-a" or opt[0] == "--broker-addr": - _host = opt[1] - if opt[0] == "--timeout": - _connTimeout = int(opt[1]) - if _connTimeout == 0: - _connTimeout = None - if opt[0] == "--alternate-exchange": - _altern_ex = opt[1] - if opt[0] == "--passive": - _passive = True - if opt[0] == "--durable": - _durable = True - if opt[0] == "--cluster-durable": - _clusterDurable = True - if opt[0] == "--file-count": - _fileCount = int (opt[1]) - if opt[0] == "--file-size": - _fileSize = int (opt[1]) - if opt[0] == "--max-queue-size": - _maxQueueSize = int (opt[1]) - if opt[0] == "--max-queue-count": - _maxQueueCount = int (opt[1]) - if opt[0] == "--limit-policy": - _limitPolicy = opt[1] - if _limitPolicy not in ("none", "reject", "flow-to-disk", "ring", "ring-strict"): - print "Error: Invalid --limit-policy argument" - sys.exit(1) - if opt[0] == "--order": - _order = opt[1] - if _order not in ("fifo", "lvq", "lvq-no-browse"): - print "Error: Invalid --order argument" - sys.exit(1) - if opt[0] == "--sequence": - _msgSequence = True - if opt[0] == "--ive": - _ive = True - if opt[0] == "--generate-queue-events": - _eventGeneration = int (opt[1]) - if opt[0] == "--force": - _if_empty = False - _if_unused = False - if opt[0] == "--force-if-not-empty": - _if_empty = False - if opt[0] == "--force-if-used": - _if_unused = False - - -nargs = len (cargs) -bm = BrokerManager () - -try: - bm.SetBroker(_host) - if nargs == 0: - bm.Overview () - else: - cmd = cargs[0] - modifier = "" - if nargs > 1: - modifier = cargs[1] - if cmd == "exchanges": - if _recursive: - bm.ExchangeListRecurse (modifier) - else: - bm.ExchangeList (modifier) - elif cmd == "queues": - if _recursive: - bm.QueueListRecurse (modifier) - else: - bm.QueueList (modifier) - elif cmd == "add": - if modifier == "exchange": - bm.AddExchange (cargs[2:]) - elif modifier == "queue": - bm.AddQueue (cargs[2:]) - else: - Usage () - elif cmd == "del": - if modifier == "exchange": - bm.DelExchange (cargs[2:]) - elif modifier == "queue": - bm.DelQueue (cargs[2:]) - else: - Usage () - elif cmd == "bind": - bm.Bind (cargs[1:]) - elif cmd == "unbind": - bm.Unbind (cargs[1:]) - else: - Usage () -except KeyboardInterrupt: - print -except Exception,e: - print "Failed: %s: %s" % (e.__class__.__name__, e) - sys.exit(1) - -bm.Disconnect() |