diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
commit | 633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch) | |
tree | 1391da89470593209466df68c0b40b89c14963b1 /tools | |
parent | c73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff) | |
download | qpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'tools')
-rwxr-xr-x | tools/setup.py | 23 | ||||
-rwxr-xr-x | tools/src/py/qmf-tool | 2 | ||||
-rwxr-xr-x | tools/src/py/qpid-config | 199 | ||||
-rwxr-xr-x | tools/src/py/qpid-ha | 153 | ||||
-rwxr-xr-x | tools/src/py/qpid-ha-tool | 183 | ||||
-rwxr-xr-x | tools/src/py/qpid-route | 10 | ||||
-rwxr-xr-x | tools/src/py/qpid-stat | 195 | ||||
-rw-r--r-- | tools/src/py/qpidtoollibs/__init__.py | 4 | ||||
-rw-r--r-- | tools/src/py/qpidtoollibs/broker.py | 168 | ||||
-rw-r--r-- | tools/src/py/qpidtoollibs/disp.py | 41 |
10 files changed, 579 insertions, 399 deletions
diff --git a/tools/setup.py b/tools/setup.py index 925c20bee9..302c25502f 100755 --- a/tools/setup.py +++ b/tools/setup.py @@ -20,20 +20,21 @@ from distutils.core import setup setup(name="qpid-tools", - version="0.15", + version="0.17", author="Apache Qpid", author_email="dev@qpid.apache.org", + package_dir={'' : 'src/py'}, packages=["qpidtoollibs"], - scripts=["qpid-cluster", - "qpid-cluster-store", - "qpid-config", - "qpid-ha-status", - "qpid-printevents", - "qpid-queue-stats", - "qpid-route", - "qpid-stat", - "qpid-tool", - "qmf-tool"], + scripts=["src/py/qpid-cluster", + "src/py/qpid-cluster-store", + "src/py/qpid-config", + "src/py/qpid-ha", + "src/py/qpid-printevents", + "src/py/qpid-queue-stats", + "src/py/qpid-route", + "src/py/qpid-stat", + "src/py/qpid-tool", + "src/py/qmf-tool"], url="http://qpid.apache.org/", license="Apache Software License", description="Diagnostic and management tools for Apache Qpid brokers.") diff --git a/tools/src/py/qmf-tool b/tools/src/py/qmf-tool index 8413ca2ca0..db51c96796 100755 --- a/tools/src/py/qmf-tool +++ b/tools/src/py/qmf-tool @@ -266,7 +266,7 @@ class QmfData: self.conn_options = conn_options self.qmf_options = qmf_options self.agent_filter = '[]' - self.connection = cqpid.Connection(self.url, self.conn_options) + self.connection = cqpid.Connection(self.url, **self.conn_options) self.connection.open() self.session = qmf2.ConsoleSession(self.connection, self.qmf_options) self.session.setAgentFilter(self.agent_filter) diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config index 0110c60aa2..1308df765d 100755 --- a/tools/src/py/qpid-config +++ b/tools/src/py/qpid-config @@ -18,12 +18,18 @@ # specific language governing permissions and limitations # under the License. # +import pdb import os from optparse import OptionParser, OptionGroup, IndentedHelpFormatter import sys import locale -from qmf.console import Session + +home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) +sys.path.append(os.path.join(home, "python")) + +from qpid.messaging import Connection +from qpidtoollibs import BrokerAgent usage = """ Usage: qpid-config [OPTIONS] @@ -36,15 +42,16 @@ Usage: qpid-config [OPTIONS] qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key] <for type xml> [-f -|filename] <for type header> [all|any] k1=v1 [, k2=v2...] - qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]""" + qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key] + qpid-config [OPTIONS] reload-acl""" description = """ Examples: $ qpid-config add queue q $ qpid-config add exchange direct d -a localhost:5672 -$ qpid-config exchanges -a 10.1.1.7:10000 -$ qpid-config queues -a guest/guest@broker-host:10000 +$ qpid-config exchanges -b 10.1.1.7:10000 +$ qpid-config queues -b guest/guest@broker-host:10000 Add Exchange <type> values: @@ -55,7 +62,7 @@ Add Exchange <type> values: xml XML Exchange - allows content filtering using an XQuery -Queue Limit Actions +Queue Limit Actions: none (default) - Use broker's default policy reject - Reject enqueued messages @@ -63,12 +70,14 @@ Queue Limit Actions ring - Replace oldest unacquired message with new ring-strict - Replace oldest message, reject if oldest is acquired -Queue Ordering Policies +Replication levels: - fifo (default) - First in, first out - lvq - Last Value Queue ordering, allows queue browsing - lvq-no-browse - Last Value Queue ordering, browsing clients may lose data""" + none - no replication + configuration - replicate queue and exchange existence and bindings, but not messages. + all - replicate configuration and messages +""" +REPLICATE_LEVELS= ["none", "configuration", "all"] class Config: def __init__(self): @@ -77,8 +86,9 @@ class Config: self._connTimeout = 10 self._ignoreDefault = False self._altern_ex = None - self._passive = False self._durable = False + self._replicate = None + self._ha_admin = False self._clusterDurable = False self._if_empty = True self._if_unused = True @@ -87,8 +97,8 @@ class Config: self._maxQueueSize = None self._maxQueueCount = None self._limitPolicy = None - self._order = None self._msgSequence = False + self._lvq_key = None self._ive = False self._eventGeneration = None self._file = None @@ -100,6 +110,7 @@ class Config: self._msgGroupHeader = None self._sharedMsgGroup = False self._extra_arguments = [] + self._start_replica = None self._returnCode = 0 config = Config() @@ -110,8 +121,7 @@ MAX_QUEUE_SIZE = "qpid.max_size" MAX_QUEUE_COUNT = "qpid.max_count" POLICY_TYPE = "qpid.policy_type" CLUSTER_DURABLE = "qpid.persist_last_node" -LVQ = "qpid.last_value_queue" -LVQNB = "qpid.last_value_queue_no_browse" +LVQ_KEY = "qpid.last_value_queue_key" MSG_SEQUENCE = "qpid.msg_sequence" IVE = "qpid.ive" QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" @@ -121,14 +131,18 @@ FLOW_STOP_SIZE = "qpid.flow_stop_size" FLOW_RESUME_SIZE = "qpid.flow_resume_size" MSG_GROUP_HDR_KEY = "qpid.group_header_key" SHARED_MSG_GROUP = "qpid.shared_msg_group" +REPLICATE = "qpid.replicate" #There are various arguments to declare that have specific program #options in this utility. However there is now a generic mechanism for #passing arguments as well. The SPECIAL_ARGS list contains the #arguments for which there are specific program options defined #i.e. the arguments for which there is special processing on add and #list -SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, - MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP] +SPECIAL_ARGS=[ + FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE, + LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION, + FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, + MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATE] class JHelpFormatter(IndentedHelpFormatter): """Format usage and description without stripping newlines from usage strings @@ -146,7 +160,7 @@ class JHelpFormatter(IndentedHelpFormatter): def Usage(): print usage - exit(-1) + sys.exit(-1) def OptionsAndArguments(argv): """ Set global variables for options, return arguments """ @@ -160,8 +174,8 @@ def OptionsAndArguments(argv): group1 = OptionGroup(parser, "General Options") group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)") - group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list") - group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]") + group1.add_option("-r", "--recursive", action="store_true", help="Show bindings in queue or exchange list") + group1.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]") group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") parser.add_option_group(group1) @@ -171,8 +185,9 @@ def OptionsAndArguments(argv): group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues") group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.") - group2.add_option("--passive", "--dry-run", action="store_true", help="Do not actually add the exchange or queue, ensure that all parameters and permissions are correct and would allow it to be created.") group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.") + group2.add_option("--replicate", action="store", metavar="<level>", help="Enable automatic replication in a HA cluster. <level> is 'none', 'configuration' or 'all').") + group2.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") parser.add_option_group(group2) group3 = OptionGroup(parser, "Options for Adding Queues") @@ -182,7 +197,7 @@ def OptionsAndArguments(argv): group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as bytes") group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as a number of messages") group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached") - group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy") + group3.add_option("--lvq-key", action="store", metavar="<key>", help="Last Value Queue key") group3.add_option("--generate-queue-events", action="store", type="int", metavar="<n>", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.") group3.add_option("--flow-stop-size", action="store", type="int", metavar="<n>", help="Turn on sender flow control when the number of queued bytes exceeds this value.") @@ -198,6 +213,7 @@ def OptionsAndArguments(argv): help="Allow message group consumption across multiple consumers.") group3.add_option("--argument", dest="extra_arguments", action="append", default=[], metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments") + group3.add_option("--start-replica", metavar="<broker-url>", help="Start replication from the same-named queue at <broker-url>") # no option for declaring an exclusive queue - which can only be used by the session that creates it. parser.add_option_group(group3) @@ -224,10 +240,10 @@ def OptionsAndArguments(argv): except: args = encArgs - if opts.bindings: + if opts.recursive: config._recursive = True - if opts.broker_addr: - config._host = opts.broker_addr + if opts.broker: + config._host = opts.broker if opts.timeout is not None: config._connTimeout = opts.timeout if config._connTimeout == 0: @@ -236,10 +252,13 @@ def OptionsAndArguments(argv): config._ignoreDefault = True if opts.alternate_exchange: config._altern_ex = opts.alternate_exchange - if opts.passive: - config._passive = True if opts.durable: config._durable = True + if opts.replicate: + if not opts.replicate in REPLICATE_LEVELS: + raise Exception("Invalid replication level '%s', should be one of: %s" % (opts.replicate, ", ".join(REPLICATE_LEVELS))) + config._replicate = opts.replicate + if opts.ha_admin: config._ha_admin = True if opts.cluster_durable: config._clusterDurable = True if opts.file: @@ -254,10 +273,10 @@ def OptionsAndArguments(argv): config._maxQueueCount = opts.max_queue_count if opts.limit_policy: config._limitPolicy = opts.limit_policy - if opts.order: - config._order = opts.order if opts.sequence: config._msgSequence = True + if opts.lvq_key: + config._lvq_key = opts.lvq_key if opts.ive: config._ive = True if opts.generate_queue_events: @@ -285,6 +304,8 @@ def OptionsAndArguments(argv): config._sharedMsgGroup = True if opts.extra_arguments: config._extra_arguments = opts.extra_arguments + if opts.start_replica: + config._start_replica = opts.start_replica return args @@ -331,27 +352,24 @@ def snarf_header_args(args): class BrokerManager: def __init__(self): self.brokerName = None - self.qmf = None + self.conn = None self.broker = None - self.mechanism = None def SetBroker(self, brokerUrl, mechanism): self.url = brokerUrl - self.qmf = Session() - self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a + client_properties={} + if config._ha_admin: client_properties["qpid.ha-admin"] = 1 + self.conn = Connection.establish(self.url, sasl_mechanisms=mechanism, client_properties=client_properties) + self.broker = BrokerAgent(self.conn) def Disconnect(self): - if self.broker: - self.qmf.delBroker(self.broker) + if self.conn: + self.conn.close() def Overview(self): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) - print "Total Exchanges: %d" % len (exchanges) + exchanges = self.broker.getAllExchanges() + queues = self.broker.getAllQueues() + print "Total Exchanges: %d" % len(exchanges) etype = {} for ex in exchanges: if ex.type not in etype: @@ -362,16 +380,16 @@ class BrokerManager: print "%15s: %d" % (typ, etype[typ]) print - print " Total Queues: %d" % len (queues) + print " Total Queues: %d" % len(queues) durable = 0 for queue in queues: if queue.durable: durable = durable + 1 print " durable: %d" % durable - print " non-durable: %d" % (len (queues) - durable) + print " non-durable: %d" % (len(queues) - durable) def ExchangeList(self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + exchanges = self.broker.getAllExchanges() caption1 = "Type " caption2 = "Exchange Name" maxNameLen = len(caption2) @@ -398,22 +416,23 @@ class BrokerManager: args = ex.arguments if not args: args = {} if ex.durable: print "--durable", + if REPLICATE in args: print "--replicate=%s" % args[REPLICATE], if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", if IVE in args and args[IVE] == 1: print "--ive", if ex.altExchange: - print "--alternate-exchange=%s" % ex._altExchange_.name, + print "--alternate-exchange=%s" % ex.altExchange, print def ExchangeListRecurse(self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + exchanges = self.broker.getAllExchanges() + bindings = self.broker.getAllBindings() + queues = self.broker.getAllQueues() for ex in exchanges: if config._ignoreDefault and not ex.name: continue if self.match(ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) for bind in bindings: - if bind.exchangeRef == ex.getObjectId(): + if bind.exchangeRef == ex.name: qname = "<unknown>" queue = self.findById(queues, bind.queueRef) if queue != None: @@ -425,7 +444,7 @@ class BrokerManager: def QueueList(self, filter): - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + queues = self.broker.getAllQueues() caption = "Queue Name" maxNameLen = len(caption) found = False @@ -450,6 +469,7 @@ class BrokerManager: args = q.arguments if not args: args = {} if q.durable: print "--durable", + if REPLICATE in args: print "--replicate=%s" % args[REPLICATE], if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable", if q.autoDelete: print "auto-del", if q.exclusive: print "excl", @@ -458,11 +478,10 @@ class BrokerManager: if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE], if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT], if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"), - if LVQ in args and args[LVQ] == 1: print "--order lvq", - if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse", + if LVQ_KEY in args: print "--lvq-key=%s" % args[LVQ_KEY], if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION], if q.altExchange: - print "--alternate-exchange=%s" % q._altExchange_.name, + print "--alternate-exchange=%s" % q.altExchange, if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE], if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE], if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT], @@ -472,14 +491,14 @@ class BrokerManager: print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS]) def QueueListRecurse(self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + exchanges = self.broker.getAllExchanges() + bindings = self.broker.getAllBindings() + queues = self.broker.getAllQueues() for queue in queues: if self.match(queue.name, filter): print "Queue '%s'" % queue.name for bind in bindings: - if bind.queueRef == queue.getObjectId(): + if bind.queueRef == queue.name: ename = "<unknown>" ex = self.findById(exchanges, bind.exchangeRef) if ex != None: @@ -508,16 +527,21 @@ class BrokerManager: declArgs[MSG_SEQUENCE] = 1 if config._ive: declArgs[IVE] = 1 - if config._altern_ex != None: - self.broker.getAmqpSession().exchange_declare(exchange=ename, type=etype, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) - else: - self.broker.getAmqpSession().exchange_declare(exchange=ename, type=etype, passive=config._passive, durable=config._durable, arguments=declArgs) + if config._altern_ex: + declArgs['alternate-exchange'] = config._altern_ex + if config._durable: + declArgs['durable'] = 1 + if config._replicate: + declArgs[REPLICATE] = config._replicate + self.broker.addExchange(etype, ename, declArgs) + def DelExchange(self, args): if len(args) < 1: Usage() ename = args[0] - self.broker.getAmqpSession().exchange_delete(exchange=ename) + self.broker.delExchange(ename) + def AddQueue(self, args): if len(args) < 1: @@ -550,15 +574,10 @@ class BrokerManager: elif config._limitPolicy == "ring-strict": declArgs[POLICY_TYPE] = "ring_strict" - if config._clusterDurable: + if config._clusterDurable: declArgs[CLUSTER_DURABLE] = 1 - if config._order: - if config._order == "fifo": - pass - elif config._order == "lvq": - declArgs[LVQ] = 1 - elif config._order == "lvq-no-browse": - declArgs[LVQNB] = 1 + if config._lvq_key: + declArgs[LVQ_KEY] = config._lvq_key if config._eventGeneration: declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration @@ -576,17 +595,21 @@ class BrokerManager: if config._sharedMsgGroup: declArgs[SHARED_MSG_GROUP] = 1 - if config._altern_ex != None: - self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) - else: - self.broker.getAmqpSession().queue_declare(queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs) - + if config._altern_ex: + declArgs['alternate-exchange'] = config._altern_ex + if config._durable: + declArgs['durable'] = 1 + if config._replicate: + declArgs[REPLICATE] = config._replicate + self.broker.addQueue(qname, declArgs) + if config._start_replica: # Start replication + self.broker._method("replicate", {"broker":config._start_replica, "queue":qname}, "org.apache.qpid.ha:habroker:ha-broker") def DelQueue(self, args): if len(args) < 1: Usage() qname = args[0] - self.broker.getAmqpSession().queue_delete(queue=qname, if_empty=config._if_empty, if_unused=config._if_unused) + self.broker.delQueue(qname) def Bind(self, args): @@ -599,7 +622,7 @@ class BrokerManager: key = args[2] # query the exchange to determine its type. - res = self.broker.getAmqpSession().exchange_query(ename) + res = self.broker.getExchange(ename) # type of the xchg determines the processing of the rest of # argv. if it's an xml xchg, we want to find a file @@ -608,7 +631,7 @@ class BrokerManager: # map containing key/value pairs. if neither of those, extra # args are ignored. ok = True - _args = None + _args = {} if res.type == "xml": # this checks/imports the -f arg [ok, xquery] = snarf_xquery_args() @@ -622,10 +645,7 @@ class BrokerManager: if not ok: sys.exit(1) - self.broker.getAmqpSession().exchange_bind(queue=qname, - exchange=ename, - binding_key=key, - arguments=_args) + self.broker.bind(ename, qname, key, _args) def Unbind(self, args): if len(args) < 2: @@ -635,11 +655,20 @@ class BrokerManager: key = "" if len(args) > 2: key = args[2] - self.broker.getAmqpSession().exchange_unbind(queue=qname, exchange=ename, binding_key=key) + self.broker.unbind(ename, qname, key) + + def ReloadAcl(self): + try: + self.broker.reloadAclFile() + except Exception, e: + if str(e).find('No object found') != -1: + print "Failed: ACL Module Not Loaded in Broker" + else: + raise def findById(self, items, id): for item in items: - if item.getObjectId() == id: + if item.name == id: return item return None @@ -697,6 +726,8 @@ def main(argv=None): bm.Bind(args[1:]) elif cmd == "unbind": bm.Unbind(args[1:]) + elif cmd == "reload-acl": + bm.ReloadAcl() else: Usage() except KeyboardInterrupt: @@ -725,9 +756,9 @@ def main(argv=None): if e.__class__.__name__ != "Timeout": print "Failed: %s: %s" % (e.__class__.__name__, e) return 1 - return config._returnCode + if __name__ == "__main__": sys.exit(main()) diff --git a/tools/src/py/qpid-ha b/tools/src/py/qpid-ha new file mode 100755 index 0000000000..bd8040cfbe --- /dev/null +++ b/tools/src/py/qpid-ha @@ -0,0 +1,153 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import qmf.console, optparse, sys, time, os +from qpid.management import managementChannel, managementClient +from qpid.messaging import Connection +from qpid.messaging import Message as QpidMessage +from qpidtoollibs.broker import BrokerAgent +try: + from uuid import uuid4 +except ImportError: + from qpid.datatypes import uuid4 + +# QMF address for the HA broker object. +HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker" + +class Command: + commands = {} + + def __init__(self, name, help, args=[]): + Command.commands[name] = self + self.name = name + self.args = args + usage="%s [options] %s\n\n%s"%(name, " ".join(args), help) + self.help = help + self.op=optparse.OptionParser(usage) + self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker at <url>") + + def execute(self): + opts, args = self.op.parse_args() + if len(args) != len(self.args)+1: + self.op.print_help() + raise Exception("Wrong number of arguments") + broker = opts.broker or "localhost:5672" + connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1}) + qmf_broker = BrokerAgent(connection) + ha_broker = qmf_broker.getHaBroker() + if not ha_broker: raise Exception("HA module is not loaded on broker at %s"%broker) + try: return self.do_execute(qmf_broker, ha_broker, opts, args) + finally: connection.close() + + def do_execute(self, qmf_broker, opts, args): + raise Exception("Command '%s' is not yet implemented"%self.name) + +class PromoteCmd(Command): + def __init__(self): + Command.__init__(self, "promote","Promote broker from backup to primary") + def do_execute(self, qmf_broker, ha_broker, opts, args): + qmf_broker._method("promote", {}, HA_BROKER) +PromoteCmd() + +class StatusCmd(Command): + def __init__(self): + Command.__init__(self, "status", "Print HA status") + self.op.add_option( + "--expect", type="string", metavar="<status>", + help="Don't print status but return 0 if it matches <status>, 1 otherwise") + def do_execute(self, qmf_broker, ha_broker, opts, args): + if opts.expect: + if opts.expect != ha_broker.status: return 1 + else: + print ha_broker.status + return 0 +StatusCmd() + +class ReplicateCmd(Command): + def __init__(self): + Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"]) + def do_execute(self, qmf_broker, ha_broker, opts, args): + qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER) +ReplicateCmd() + +class SetCmd(Command): + def __init__(self): + Command.__init__(self, "set", "Set HA configuration settings") + def add(optname, metavar, type, help): + self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store") + add("--brokers", "<url>", "string", "HA brokers use <url> to connect to each other") + add("--public-brokers", "<url>", "string", "Clients use <url> to connect to HA brokers") + add("--backups", "<n>", "int", "Expect <n> backups to be running"), + + def do_execute(self, qmf_broker, ha_broker, opts, args): + if (opts.brokers): qmf_broker._method("setBrokers", {"url":opts.brokers}, HA_BROKER) + if (opts.public_brokers): qmf_broker._method("setPublicBrokers", {"url":opts.public_brokers}, HA_BROKER) + if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups}, HA_BROKER) + +SetCmd() + +class QueryCmd(Command): + def __init__(self): + Command.__init__(self, "query", "Print HA configuration settings") + + def do_execute(self, qmf_broker, ha_broker, opts, args): + hb = ha_broker + for x in [("Status:", hb.status), + ("Brokers URL:", hb.brokers), + ("Public URL:", hb.publicBrokers), + ("Expected Backups:", hb.expectedBackups), + ("Replicate: ", hb.replicateDefault) + ]: + print "%-20s %s"%(x[0], x[1]) +QueryCmd() + +def print_usage(prog): + print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%prog + for name, command in Command.commands.iteritems(): + help = command.help + print " %-12s %s."%(name, help.split(".")[0]) + print "\nFor help with a command type: %s <command> --help\n"%prog + +def find_command(args): + """Find a command among the arguments and options""" + for arg in args: + if arg in Command.commands: + return Command.commands[arg] + return None + +def main(argv): + try: + args=argv[1:] + if args and args[0] == "--help-all": + for c in Command.commands.itervalues(): + c.op.print_help(); print + return 1 + command = find_command(args) + if not command: + print_usage(os.path.basename(argv[0])); + return 1; + if command.execute(): return 1 + except Exception, e: + print e + return 1 + +if __name__ == "__main__": + sys.exit(main(sys.argv)) diff --git a/tools/src/py/qpid-ha-tool b/tools/src/py/qpid-ha-tool deleted file mode 100755 index 8e8107657c..0000000000 --- a/tools/src/py/qpid-ha-tool +++ /dev/null @@ -1,183 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import qmf.console, optparse, sys -from qpid.management import managementChannel, managementClient -from qpid.messaging import Connection -from qpid.messaging import Message as QpidMessage -try: - from uuid import uuid4 -except ImportError: - from qpid.datatypes import uuid4 - -# Utility for doing fast qmf2 operations on a broker. -class QmfBroker(object): - def __init__(self, conn): - self.conn = conn - self.sess = self.conn.session() - self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ - str(uuid4()) - self.reply_rx = self.sess.receiver(self.reply_to) - self.reply_rx.capacity = 10 - self.tx = self.sess.sender("qmf.default.direct/broker") - self.next_correlator = 1 - - def close(self): - self.conn.close() - - def __repr__(self): - return "Qpid Broker: %s" % self.url - - def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker"): - props = {'method' : 'request', - 'qmf.opcode' : '_method_request', - 'x-amqp-0-10.app-id' : 'qmf2'} - correlator = str(self.next_correlator) - self.next_correlator += 1 - - content = {'_object_id' : {'_object_name' : addr}, - '_method_name' : method, - '_arguments' : arguments} - - message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, - properties=props, subject="broker") - self.tx.send(message) - response = self.reply_rx.fetch(10) - if response.properties['qmf.opcode'] == '_exception': - raise Exception("Exception from Agent: %r" % response.content['_values']) - if response.properties['qmf.opcode'] != '_method_response': - raise Exception("bad response: %r" % response.properties) - return response.content['_arguments'] - - def _sendRequest(self, opcode, content): - props = {'method' : 'request', - 'qmf.opcode' : opcode, - 'x-amqp-0-10.app-id' : 'qmf2'} - correlator = str(self.next_correlator) - self.next_correlator += 1 - message = QpidMessage(content, reply_to=self.reply_to, correlation_id=correlator, - properties=props, subject="broker") - self.tx.send(message) - return correlator - - def _doClassQuery(self, class_name): - query = {'_what' : 'OBJECT', - '_schema_id' : {'_class_name' : class_name}} - correlator = self._sendRequest('_query_request', query) - response = self.reply_rx.fetch(10) - if response.properties['qmf.opcode'] != '_query_response': - raise Exception("bad response") - items = [] - done = False - while not done: - for item in response.content: - items.append(item['_values']) - if 'partial' in response.properties: - response = self.reply_rx.fetch(10) - else: - done = True - return items - - def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'): - query = {'_what' : 'OBJECT', - '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}} - correlator = self._sendRequest('_query_request', query) - response = self.reply_rx.fetch(10) - if response.properties['qmf.opcode'] != '_query_response': - raise Exception("bad response") - items = [] - done = False - while not done: - for item in response.content: - items.append(item['_values']) - if 'partial' in response.properties: - response = self.reply_rx.fetch(10) - else: - done = True - if len(items) == 1: - return items[0] - return None - - def _getAllBrokerObjects(self, cls): - items = self._doClassQuery(cls.__name__.lower()) - objs = [] - for item in items: - objs.append(cls(self, item)) - return objs - - def _getBrokerObject(self, cls, name): - obj = self._doNameQuery(cls.__name__.lower(), name) - if obj: - return cls(self, obj) - return None - - -op=optparse.OptionParser(usage="Usage: %prog [options] [broker-address]") - -op.add_option("-p", "--promote", action="store_true", - help="Promote a backup broker to become the primary.") -op.add_option("-c", "--client-addresses", action="store", type="string", - help="Set list of addresses used by clients to connect to the HA cluster.") -op.add_option("-b", "--broker-addresses", action="store", type="string", - help="Set list of addresses used by HA brokers to connect to each other.") -op.add_option("-q", "--query", action="store_true", - help="Show the current HA settings on the broker.") - -def get_ha_broker(qmf_broker): - ha_brokers = qmf_broker._doClassQuery("habroker") - if (not ha_brokers): raise Exception("Broker does not have HA enabled.") - return ha_brokers[0] - -def main(argv): - try: - opts, args = op.parse_args(argv) - if len(args) >1: broker = args[1] - else: broker = "localhost:5672" - conn = Connection.establish(broker, client_properties={"qpid.ha-admin":1}) - ha_broker = "org.apache.qpid.ha:habroker:ha-broker" - try: - qmf_broker = QmfBroker(conn) - get_ha_broker(qmf_broker) # Verify that HA is enabled - action=False - if opts.promote: - qmf_broker._method("promote", {}, ha_broker) - action=True - if opts.broker_addresses: - qmf_broker._method('setBrokerAddresses', {'brokerAddresses':opts.broker_addresses}, ha_broker) - action=True - if opts.client_addresses: - qmf_broker._method('setClientAddresses', {'clientAddresses':opts.client_addresses}, ha_broker) - action=True - if opts.query or not action: - hb = get_ha_broker(qmf_broker) - print "status=%s"%hb["status"] - print "broker-addresses=%s"%hb["brokerAddresses"] - print "client-addresses=%s"%hb["clientAddresses"] - return 0 - finally: - conn.close() # Avoid errors shutting down threads. - except Exception, e: - raise # FIXME aconway 2012-01-31: - print e - return 1 - -if __name__ == "__main__": - sys.exit(main(sys.argv)) diff --git a/tools/src/py/qpid-route b/tools/src/py/qpid-route index f90416d7b0..0316c24322 100755 --- a/tools/src/py/qpid-route +++ b/tools/src/py/qpid-route @@ -62,6 +62,7 @@ class Config: self._ack = 0 self._connTimeout = 10 self._client_sasl_mechanism = None + self._ha_admin = False config = Config() @@ -96,7 +97,7 @@ def OptionsAndArguments(argv): parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp") parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.") - + parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") opts, encArgs = parser.parse_args(args=argv) try: @@ -128,6 +129,9 @@ def OptionsAndArguments(argv): if opts.transport: config._transport = opts.transport + if opts.ha_admin: + config._ha_admin = True + if opts.ack: config._ack = opts.ack @@ -143,7 +147,9 @@ class RouteManager: self.local = BrokerURL(localBroker) self.remote = None self.qmf = Session() - self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism) + client_properties = {} + if config._ha_admin: client_properties["qpid.ha-admin"] = 1 + self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism, client_properties=client_properties) self.broker._waitForStable() self.agent = self.broker.getBrokerAgent() diff --git a/tools/src/py/qpid-stat b/tools/src/py/qpid-stat index bb094554e6..5a816baf6e 100755 --- a/tools/src/py/qpid-stat +++ b/tools/src/py/qpid-stat @@ -30,8 +30,8 @@ from qpid.messaging import Connection home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) sys.path.append(os.path.join(home, "python")) -from qpidtoollibs.broker import BrokerAgent -from qpidtoollibs.disp import Display, Header, Sorter +from qpidtoollibs import BrokerAgent +from qpidtoollibs import Display, Header, Sorter, YN, Commas, TimeLong class Config: @@ -42,8 +42,8 @@ class Config: self._limit = 50 self._increasing = False self._sortcol = None - self._details = None self._sasl_mechanism = None + self._ha_admin = False config = Config() @@ -52,42 +52,45 @@ def OptionsAndArguments(argv): global config - parser = OptionParser(usage="usage: %prog [options] BROKER", - description="Example: $ qpid-stat -q broker-host:10000") + parser = OptionParser(usage="usage: %prog [options] -[gcequm] [object-name]") group1 = OptionGroup(parser, "General Options") - group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)") - group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") + group1.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="<url>", + help="URL of the broker to query") + group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", + help="Maximum time to wait for broker connection (in seconds)") + group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", + help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") + group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") parser.add_option_group(group1) group2 = OptionGroup(parser, "Display Options") - group2.add_option("-b", "--broker", help="Show Brokers", action="store_const", const="b", dest="show") - group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") - group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") - group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show") - group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show") - group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show") + group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show") + group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") + group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") + group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show") + group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show") + group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show") + group2.add_option( "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show") group2.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name") group2.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)") group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows") - group2.add_option("-D", "--details", action="store", metavar="<name>", dest="detail", default=None, help="Display details on a single object.") + parser.add_option_group(group2) opts, args = parser.parse_args(args=argv) if not opts.show: - parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help") + parser.error("You must specify one of these options: -g, -c, -e, -q, -m, or -u. For details, try $ qpid-stat --help") config._types = opts.show config._sortcol = opts.sort_by + config._host = opts.broker config._connTimeout = opts.timeout config._increasing = opts.increasing config._limit = opts.limit config._sasl_mechanism = opts.sasl_mechanism - config._detail = opts.detail - - if args: - config._host = args[0] + config._ha_admin = opts.ha_admin return args @@ -118,24 +121,24 @@ class IpAddr: class BrokerManager: def __init__(self): - self.brokerName = None - self.connections = [] - self.brokers = [] - self.cluster = None + self.brokerName = None + self.connection = None + self.broker = None + self.cluster = None def SetBroker(self, brokerUrl, mechanism): self.url = brokerUrl - self.connections.append(Connection(self.url, sasl_mechanism=mechanism)) - self.connections[0].open() - self.brokers.append(BrokerAgent(self.connections[0])) + client_properties={} + if config._ha_admin: client_properties["qpid.ha-admin"] = 1 + self.connection = Connection.establish(self.url, sasl_mechanisms=mechanism, client_properties=client_properties) + self.broker = BrokerAgent(self.connection) def Disconnect(self): """ Release any allocated brokers. Ignore any failures as the tool is shutting down. """ try: - for conn in self.connections: - conn.close() + connection.close() except: pass @@ -175,7 +178,7 @@ class BrokerManager: hosts.append(bestUrl) return hosts - def displayBroker(self, subs): + def displayBroker(self): disp = Display(prefix=" ") heads = [] heads.append(Header('uptime', Header.DURATION)) @@ -184,7 +187,7 @@ class BrokerManager: heads.append(Header('exchanges', Header.COMMAS)) heads.append(Header('queues', Header.COMMAS)) rows = [] - broker = self.brokers[0].getBroker() + broker = self.broker.getBroker() connections = self.getConnectionMap() sessions = self.getSessionMap() exchanges = self.getExchangeMap() @@ -229,7 +232,7 @@ class BrokerManager: disp.formattedTable('Aggregate Broker Statistics:', heads, rows) - def displayConn(self, subs): + def displayConn(self): disp = Display(prefix=" ") heads = [] heads.append(Header('client-addr')) @@ -241,8 +244,8 @@ class BrokerManager: heads.append(Header('msgIn', Header.KMG)) heads.append(Header('msgOut', Header.KMG)) rows = [] - connections = self.brokers[0].getAllConnections() - broker = self.brokers[0].getBroker() + connections = self.broker.getAllConnections() + broker = self.broker.getBroker() for conn in connections: row = [] row.append(conn.address) @@ -262,10 +265,10 @@ class BrokerManager: dispRows = rows disp.formattedTable(title, heads, dispRows) - def displaySession(self, subs): + def displaySession(self): disp = Display(prefix=" ") - def displayExchange(self, subs): + def displayExchange(self): disp = Display(prefix=" ") heads = [] heads.append(Header("exchange")) @@ -279,7 +282,7 @@ class BrokerManager: heads.append(Header("byteOut", Header.KMG)) heads.append(Header("byteDrop", Header.KMG)) rows = [] - exchanges = self.brokers[0].getAllExchanges() + exchanges = self.broker.getAllExchanges() for ex in exchanges: row = [] row.append(ex.name) @@ -301,7 +304,7 @@ class BrokerManager: dispRows = rows disp.formattedTable(title, heads, dispRows) - def displayQueues(self, subs): + def displayQueues(self): disp = Display(prefix=" ") heads = [] heads.append(Header("queue")) @@ -317,7 +320,7 @@ class BrokerManager: heads.append(Header("cons", Header.KMG)) heads.append(Header("bind", Header.KMG)) rows = [] - queues = self.brokers[0].getAllQueues() + queues = self.broker.getAllQueues() for q in queues: row = [] row.append(q.name) @@ -341,11 +344,67 @@ class BrokerManager: dispRows = rows disp.formattedTable(title, heads, dispRows) - def displayQueue(self, subs): + + def displayQueue(self, name): + queue = self.broker.getQueue(name) + if not queue: + print "Queue '%s' not found" % name + return + disp = Display(prefix=" ") heads = [] + heads.append(Header('Name')) + heads.append(Header('Durable', Header.YN)) + heads.append(Header('AutoDelete', Header.YN)) + heads.append(Header('Exclusive', Header.YN)) + heads.append(Header('FlowStopped', Header.YN)) + heads.append(Header('FlowStoppedCount', Header.COMMAS)) + heads.append(Header('Consumers', Header.COMMAS)) + heads.append(Header('Bindings', Header.COMMAS)) + rows = [] + rows.append([queue.name, queue.durable, queue.autoDelete, queue.exclusive, + queue.flowStopped, queue.flowStoppedCount, + queue.consumerCount, queue.bindingCount]) + disp.formattedTable("Properties:", heads, rows) + print + + heads = [] + heads.append(Header('Property')) + heads.append(Header('Value')) + rows = [] + rows.append(['arguments', queue.arguments]) + rows.append(['alt-exchange', queue.altExchange]) + disp.formattedTable("Optional Properties:", heads, rows) + print - def displaySubscriptions(self, subs): + heads = [] + heads.append(Header('Statistic')) + heads.append(Header('Messages', Header.COMMAS)) + heads.append(Header('Bytes', Header.COMMAS)) + rows = [] + rows.append(['queue-depth', queue.msgDepth, queue.byteDepth]) + rows.append(['total-enqueues', queue.msgTotalEnqueues, queue.byteTotalEnqueues]) + rows.append(['total-dequeues', queue.msgTotalDequeues, queue.byteTotalDequeues]) + rows.append(['persistent-enqueues', queue.msgPersistEnqueues, queue.bytePersistEnqueues]) + rows.append(['persistent-dequeues', queue.msgPersistDequeues, queue.bytePersistDequeues]) + rows.append(['transactional-enqueues', queue.msgTxnEnqueues, queue.byteTxnEnqueues]) + rows.append(['transactional-dequeues', queue.msgTxnDequeues, queue.byteTxnDequeues]) + rows.append(['flow-to-disk-depth', queue.msgFtdDepth, queue.byteFtdDepth]) + rows.append(['flow-to-disk-enqueues', queue.msgFtdEnqueues, queue.byteFtdEnqueues]) + rows.append(['flow-to-disk-dequeues', queue.msgFtdDequeues, queue.byteFtdDequeues]) + rows.append(['acquires', queue.acquires, None]) + rows.append(['releases', queue.releases, None]) + rows.append(['discards-ttl-expired', queue.discardsTtl, None]) + rows.append(['discards-limit-overflow', queue.discardsOverflow, None]) + rows.append(['discards-ring-overflow', queue.discardsRing, None]) + rows.append(['discards-lvq-replace', queue.discardsLvq, None]) + rows.append(['discards-subscriber-reject', queue.discardsSubscriber, None]) + rows.append(['discards-purged', queue.discardsPurge, None]) + rows.append(['reroutes', queue.reroutes, None]) + disp.formattedTable("Statistics:", heads, rows) + + + def displaySubscriptions(self): disp = Display(prefix=" ") heads = [] heads.append(Header("subscr")) @@ -359,7 +418,7 @@ class BrokerManager: heads.append(Header("creditMode")) heads.append(Header("delivered", Header.KMG)) rows = [] - subscriptions = self.brokers[0].getAllSubscriptions() + subscriptions = self.broker.getAllSubscriptions() sessions = self.getSessionMap() connections = self.getConnectionMap() for s in subscriptions: @@ -388,59 +447,75 @@ class BrokerManager: dispRows = rows disp.formattedTable(title, heads, dispRows) - def displayMemory(self, unused): + def displayMemory(self): disp = Display(prefix=" ") heads = [Header('Statistic'), Header('Value', Header.COMMAS)] rows = [] - memory = self.brokers[0].getMemory() + memory = self.broker.getMemory() for k,v in memory.values.items(): if k != 'name': rows.append([k, v]) disp.formattedTable('Broker Memory Statistics:', heads, rows) + def displayAcl(self): + acl = self.broker.getAcl() + if not acl: + print "ACL Policy Module is not installed" + return + disp = Display(prefix=" ") + heads = [Header('Statistic'), Header('Value')] + rows = [] + rows.append(['policy-file', acl.policyFile]) + rows.append(['enforcing', YN(acl.enforcingAcl)]) + rows.append(['has-transfer-acls', YN(acl.transferAcl)]) + rows.append(['last-acl-load', TimeLong(acl.lastAclLoad)]) + rows.append(['acl-denials', Commas(acl.aclDenyCount)]) + disp.formattedTable('ACL Policy Statistics:', heads, rows) + def getExchangeMap(self): - exchanges = self.brokers[0].getAllExchanges() + exchanges = self.broker.getAllExchanges() emap = {} for e in exchanges: emap[e.name] = e return emap def getQueueMap(self): - queues = self.brokers[0].getAllQueues() + queues = self.broker.getAllQueues() qmap = {} for q in queues: qmap[q.name] = q return qmap def getSessionMap(self): - sessions = self.brokers[0].getAllSessions() + sessions = self.broker.getAllSessions() smap = {} for s in sessions: smap[s.name] = s return smap def getConnectionMap(self): - connections = self.brokers[0].getAllConnections() + connections = self.broker.getAllConnections() cmap = {} for c in connections: cmap[c.address] = c return cmap - def displayMain(self, main, subs): - if main == 'b': self.displayBroker(subs) - elif main == 'c': self.displayConn(subs) - elif main == 's': self.displaySession(subs) - elif main == 'e': self.displayExchange(subs) + def displayMain(self, names, main): + if main == 'g': self.displayBroker() + elif main == 'c': self.displayConn() + elif main == 's': self.displaySession() + elif main == 'e': self.displayExchange() elif main == 'q': - if config._detail: - self.displayQueue(subs, config._detail) + if len(names) >= 1: + self.displayQueue(names[0]) else: - self.displayQueues(subs) - elif main == 'u': self.displaySubscriptions(subs) - elif main == 'm': self.displayMemory(subs) + self.displayQueues() + elif main == 'u': self.displaySubscriptions() + elif main == 'm': self.displayMemory() + elif main == 'acl': self.displayAcl() - def display(self): - self.displayMain(config._types[0], config._types[1:]) + def display(self, names): + self.displayMain(names, config._types) def main(argv=None): @@ -450,7 +525,7 @@ def main(argv=None): try: bm.SetBroker(config._host, config._sasl_mechanism) - bm.display() + bm.display(args) bm.Disconnect() return 0 except KeyboardInterrupt: diff --git a/tools/src/py/qpidtoollibs/__init__.py b/tools/src/py/qpidtoollibs/__init__.py index 31d5a2ef58..2815bac22f 100644 --- a/tools/src/py/qpidtoollibs/__init__.py +++ b/tools/src/py/qpidtoollibs/__init__.py @@ -16,3 +16,7 @@ # specific language governing permissions and limitations # under the License. # + +from qpidtoollibs.broker import * +from qpidtoollibs.disp import * + diff --git a/tools/src/py/qpidtoollibs/broker.py b/tools/src/py/qpidtoollibs/broker.py index 6a380caf8d..0bae786306 100644 --- a/tools/src/py/qpidtoollibs/broker.py +++ b/tools/src/py/qpidtoollibs/broker.py @@ -24,6 +24,9 @@ except ImportError: from qpid.datatypes import uuid4 class BrokerAgent(object): + """ + Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection. + """ def __init__(self, conn): self.conn = conn self.sess = self.conn.session() @@ -35,6 +38,9 @@ class BrokerAgent(object): self.next_correlator = 1 def close(self): + """ + Close the proxy session. This will not affect the connection used in creating the object. + """ self.sess.close() def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): @@ -89,9 +95,8 @@ class BrokerAgent(object): self.sess.acknowledge() return items - def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'): - query = {'_what' : 'OBJECT', - '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}} + def _doNameQuery(self, object_id): + query = {'_what' : 'OBJECT', '_object_id' : {'_object_name' : object_id}} correlator = self._sendRequest('_query_request', query) response = self.reply_rx.fetch(10) if response.properties['qmf.opcode'] != '_query_response': @@ -116,65 +121,74 @@ class BrokerAgent(object): for item in items: objs.append(cls(self, item)) return objs - - def _getBrokerObject(self, cls, name): - obj = self._doNameQuery(cls.__name__.lower(), name) + + def _getBrokerObject(self, cls, oid): + obj = self._doNameQuery(oid) if obj: return cls(self, obj) return None - def getCluster(self): - return self._getAllBrokerObjects(Cluster) - - def getBroker(self): + def _getSingleObject(self, cls): # # getAllBrokerObjects is used instead of getBrokerObject(Broker, 'amqp-broker') because # of a bug that used to be in the broker whereby by-name queries did not return the # object timestamps. # - brokers = self._getAllBrokerObjects(Broker) - if brokers: - return brokers[0] + objects = self._getAllBrokerObjects(cls) + if objects: return objects[0] return None - def getMemory(self): - return self._getAllBrokerObjects(Memory)[0] + def getBroker(self): + """ + Get the Broker object that contains broker-scope statistics and operations. + """ + return self._getSingleObject(Broker) + + + def getCluster(self): + return self._getSingleObject(Cluster) + + def getHaBroker(self): + return self._getSingleObject(HaBroker) def getAllConnections(self): return self._getAllBrokerObjects(Connection) - def getConnection(self, name): - return self._getBrokerObject(Connection, name) + def getConnection(self, oid): + return self._getBrokerObject(Connection, "org.apache.qpid.broker:connection:%s" % oid) def getAllSessions(self): return self._getAllBrokerObjects(Session) - def getSession(self, name): - return self._getBrokerObject(Session, name) + def getSession(self, oid): + return self._getBrokerObject(Session, "org.apache.qpid.broker:session:%s" % oid) def getAllSubscriptions(self): return self._getAllBrokerObjects(Subscription) - def getSubscription(self, name): - return self._getBrokerObject(Subscription, name) + def getSubscription(self, oid): + return self._getBrokerObject(Subscription, "org.apache.qpid.broker:subscription:%s" % oid) def getAllExchanges(self): return self._getAllBrokerObjects(Exchange) def getExchange(self, name): - return self._getBrokerObject(Exchange, name) + return self._getBrokerObject(Exchange, "org.apache.qpid.broker:exchange:%s" % name) def getAllQueues(self): return self._getAllBrokerObjects(Queue) def getQueue(self, name): - return self._getBrokerObject(Queue, name) + return self._getBrokerObject(Queue, "org.apache.qpid.broker:queue:%s" % name) def getAllBindings(self): return self._getAllBrokerObjects(Binding) - def getBinding(self, exchange=None, queue=None): - pass + def getAllLinks(self): + return self._getAllBrokerObjects(Link) + + def getAcl(self): + return self._getSingleObject(Acl) def echo(self, sequence, body): """Request a response to test the path to the management broker""" @@ -204,23 +218,69 @@ class BrokerAgent(object): """Get the message timestamping configuration""" pass -# def addExchange(self, exchange_type, name, **kwargs): -# pass - -# def delExchange(self, name): -# pass - -# def addQueue(self, name, **kwargs): -# pass - -# def delQueue(self, name): -# pass - -# def bind(self, exchange, queue, key, **kwargs): -# pass - -# def unbind(self, exchange, queue, key, **kwargs): -# pass + def addExchange(self, exchange_type, name, options={}, **kwargs): + properties = {} + properties['exchange-type'] = exchange_type + for k,v in options.items(): + properties[k] = v + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'exchange', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delExchange(self, name): + args = {'type': 'exchange', 'name': name} + self._method('delete', args) + + def addQueue(self, name, options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'queue', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delQueue(self, name): + args = {'type': 'queue', 'name': name} + self._method('delete', args) + + def bind(self, exchange, queue, key, options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'properties': properties, + 'strict': True} + self._method('create', args) + + def unbind(self, exchange, queue, key, **kwargs): + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'strict': True} + self._method('delete', args) + + def reloadAclFile(self): + self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") + + def acl_lookup(self, userName, action, aclObj, aclObjName, propMap): + args = {'userId': userName, + 'action': action, + 'object': aclObj, + 'objectName': aclObjName, + 'propertyMap': propMap} + return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") + + def acl_lookupPublish(self, userName, exchange, key): + args = {'userId': userName, + 'exchangeName': exchange, + 'routingKey': key} + return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") def create(self, _type, name, properties, strict): """Create an object of the specified type""" @@ -230,9 +290,9 @@ class BrokerAgent(object): """Delete an object of the specified type""" pass - def query(self, _type, name): + def query(self, _type, oid): """Query the current state of an object""" - return self._getBrokerObject(self, _type, name) + return self._getBrokerObject(self, _type, oid) class BrokerObject(object): @@ -255,6 +315,9 @@ class BrokerObject(object): return full_name[colon+1:] return value + def getObjectId(self): + return self.content['_object_id']['_object_name'] + def getAttributes(self): return self.values @@ -271,7 +334,7 @@ class BrokerObject(object): """ Reload the property values from the agent. """ - refreshed = self.broker._getBrokerObject(self.__class__, self.name) + refreshed = self.broker._getBrokerObject(self.__class__, self.getObjectId()) if refreshed: self.content = refreshed.content self.values = self.content['_values'] @@ -282,6 +345,14 @@ class Broker(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) +class Cluster(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class HaBroker(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + class Memory(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) @@ -328,3 +399,10 @@ class Queue(BrokerObject): self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, "org.apache.qpid.broker:queue:%s" % self.name) +class Link(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Acl(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) diff --git a/tools/src/py/qpidtoollibs/disp.py b/tools/src/py/qpidtoollibs/disp.py index cb7d3da306..a0c77370a5 100644 --- a/tools/src/py/qpidtoollibs/disp.py +++ b/tools/src/py/qpidtoollibs/disp.py @@ -21,6 +21,31 @@ from time import strftime, gmtime +def YN(val): + if val: + return 'Y' + return 'N' + +def Commas(value): + sval = str(value) + result = "" + while True: + if len(sval) == 0: + return result + left = sval[:-3] + right = sval[-3:] + result = right + result + if len(left) > 0: + result = ',' + result + sval = left + +def TimeLong(value): + return strftime("%c", gmtime(value / 1000000000)) + +def TimeShort(value): + return strftime("%X", gmtime(value / 1000000000)) + + class Header: """ """ NONE = 1 @@ -59,9 +84,9 @@ class Header: return 'Y' return '' if self.format == Header.TIME_LONG: - return strftime("%c", gmtime(value / 1000000000)) + return TimeLong(value) if self.format == Header.TIME_SHORT: - return strftime("%X", gmtime(value / 1000000000)) + return TimeShort(value) if self.format == Header.DURATION: if value < 0: value = 0 sec = value / 1000000000 @@ -78,17 +103,7 @@ class Header: result += "%ds" % (sec % 60) return result if self.format == Header.COMMAS: - sval = str(value) - result = "" - while True: - if len(sval) == 0: - return result - left = sval[:-3] - right = sval[-3:] - result = right + result - if len(left) > 0: - result = ',' + result - sval = left + return Commas(value) except: return "?" |