summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /tools
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'tools')
-rwxr-xr-xtools/setup.py2
-rw-r--r--tools/src/py/.gitignore1
-rwxr-xr-xtools/src/py/qpid-cluster27
-rwxr-xr-xtools/src/py/qpid-config25
-rwxr-xr-xtools/src/py/qpid-ha86
-rwxr-xr-xtools/src/py/qpid-printevents126
-rwxr-xr-xtools/src/py/qpid-queue-stats14
-rwxr-xr-xtools/src/py/qpid-route31
-rwxr-xr-xtools/src/py/qpid-stat56
-rwxr-xr-xtools/src/py/qpid-tool10
-rw-r--r--tools/src/py/qpidtoollibs/broker.py46
-rw-r--r--tools/src/py/qpidtoollibs/disp.py12
12 files changed, 314 insertions, 122 deletions
diff --git a/tools/setup.py b/tools/setup.py
index 302c25502f..c9dc21c620 100755
--- a/tools/setup.py
+++ b/tools/setup.py
@@ -20,7 +20,7 @@
from distutils.core import setup
setup(name="qpid-tools",
- version="0.17",
+ version="0.19",
author="Apache Qpid",
author_email="dev@qpid.apache.org",
package_dir={'' : 'src/py'},
diff --git a/tools/src/py/.gitignore b/tools/src/py/.gitignore
index 97cb05dc36..b775fd83a1 100644
--- a/tools/src/py/.gitignore
+++ b/tools/src/py/.gitignore
@@ -19,4 +19,5 @@
# with the License. You may obtain a copy of the License at
/qpid-clusterc
/qpid-configc
+/qpid-hac
/qpid-routec
diff --git a/tools/src/py/qpid-cluster b/tools/src/py/qpid-cluster
index d4f9391dcf..7d800b52fb 100755
--- a/tools/src/py/qpid-cluster
+++ b/tools/src/py/qpid-cluster
@@ -64,17 +64,19 @@ class IpAddr:
return bestAddr
class BrokerManager:
- def __init__(self, config):
- self.config = config
- self.brokerName = None
- self.qmf = None
- self.broker = None
- self.brokers = []
+ def __init__(self, config, conn_options):
+ self.config = config
+ self.cert = None
+ self.conn_options = conn_options
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+ self.brokers = []
def SetBroker(self, brokerUrl):
self.url = brokerUrl
self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout)
+ self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout, **self.conn_options)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == '0':
@@ -240,6 +242,8 @@ def main(argv=None):
description="Example: $ qpid-cluster -C broker-host:10000")
parser.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)")
+ parser.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ parser.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
parser.add_option("-C", "--all-connections", action="store_true", default=False, help="View client connections to all cluster members")
parser.add_option("-c", "--connections", metavar="ID", help="View client connections to specified member")
parser.add_option("-d", "--del-connection", metavar="HOST:PORT", help="Disconnect a client connection")
@@ -280,7 +284,13 @@ def main(argv=None):
config._force = opts.force
config._numeric = opts.numeric
- bm = BrokerManager(config)
+ conn_options = {}
+ if opts.sasl_mechanism:
+ conn_options['mechanisms'] = opts.sasl_mechanism
+ if opts.ssl_certificate:
+ conn_options['ssl_certfile'] = opts.ssl_certificate
+
+ bm = BrokerManager(config, conn_options)
try:
bm.SetBroker(config._host)
@@ -303,7 +313,6 @@ def main(argv=None):
bm.Disconnect()
except Exception, e:
- raise
print str(e)
return 1
diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config
index 1308df765d..df43b7ea4e 100755
--- a/tools/src/py/qpid-config
+++ b/tools/src/py/qpid-config
@@ -88,7 +88,6 @@ class Config:
self._altern_ex = None
self._durable = False
self._replicate = None
- self._ha_admin = False
self._clusterDurable = False
self._if_empty = True
self._if_unused = True
@@ -102,7 +101,6 @@ class Config:
self._ive = False
self._eventGeneration = None
self._file = None
- self._sasl_mechanism = None
self._flowStopCount = None
self._flowResumeCount = None
self._flowStopSize = None
@@ -114,6 +112,7 @@ class Config:
self._returnCode = 0
config = Config()
+conn_options = {}
FILECOUNT = "qpid.file_count"
FILESIZE = "qpid.file_size"
@@ -177,6 +176,9 @@ def OptionsAndArguments(argv):
group1.add_option("-r", "--recursive", action="store_true", help="Show bindings in queue or exchange list")
group1.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]")
group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ group1.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ group1.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+ group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
parser.add_option_group(group1)
group_ls = OptionGroup(parser, "Options for Listing Exchanges and Queues")
@@ -187,7 +189,6 @@ def OptionsAndArguments(argv):
group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.")
group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.")
group2.add_option("--replicate", action="store", metavar="<level>", help="Enable automatic replication in a HA cluster. <level> is 'none', 'configuration' or 'all').")
- group2.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
parser.add_option_group(group2)
group3 = OptionGroup(parser, "Options for Adding Queues")
@@ -306,6 +307,16 @@ def OptionsAndArguments(argv):
config._extra_arguments = opts.extra_arguments
if opts.start_replica:
config._start_replica = opts.start_replica
+
+ if opts.sasl_mechanism:
+ conn_options['sasl_mechanisms'] = opts.sasl_mechanism
+ if opts.ssl_certificate:
+ conn_options['ssl_certfile'] = opts.ssl_certificate
+ if opts.ssl_key:
+ conn_options['ssl_key'] = opts.ssl_key
+ if opts.ha_admin:
+ conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+
return args
@@ -355,11 +366,9 @@ class BrokerManager:
self.conn = None
self.broker = None
- def SetBroker(self, brokerUrl, mechanism):
+ def SetBroker(self, brokerUrl):
self.url = brokerUrl
- client_properties={}
- if config._ha_admin: client_properties["qpid.ha-admin"] = 1
- self.conn = Connection.establish(self.url, sasl_mechanisms=mechanism, client_properties=client_properties)
+ self.conn = Connection.establish(self.url, **conn_options)
self.broker = BrokerAgent(self.conn)
def Disconnect(self):
@@ -690,7 +699,7 @@ def main(argv=None):
bm = BrokerManager()
try:
- bm.SetBroker(config._host, config._sasl_mechanism)
+ bm.SetBroker(config._host)
if len(args) == 0:
bm.Overview()
else:
diff --git a/tools/src/py/qpid-ha b/tools/src/py/qpid-ha
index bd8040cfbe..5b701a1fb4 100755
--- a/tools/src/py/qpid-ha
+++ b/tools/src/py/qpid-ha
@@ -19,8 +19,7 @@
# under the License.
#
-import qmf.console, optparse, sys, time, os
-from qpid.management import managementChannel, managementClient
+import optparse, sys, time, os
from qpid.messaging import Connection
from qpid.messaging import Message as QpidMessage
from qpidtoollibs.broker import BrokerAgent
@@ -32,29 +31,44 @@ except ImportError:
# QMF address for the HA broker object.
HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"
+class ExitStatus(Exception):
+ """Raised if a command want's a non-0 exit status from the script"""
+ def __init__(self, status): self.status = status
+
class Command:
commands = {}
- def __init__(self, name, help, args=[]):
+ def __init__(self, name, help, arg_names=[]):
Command.commands[name] = self
self.name = name
- self.args = args
- usage="%s [options] %s\n\n%s"%(name, " ".join(args), help)
+ self.arg_names = arg_names
+ usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help)
self.help = help
self.op=optparse.OptionParser(usage)
- self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker at <url>")
-
- def execute(self):
- opts, args = self.op.parse_args()
- if len(args) != len(self.args)+1:
+ self.op.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ self.op.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ self.op.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+ self.op.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]")
+
+ def execute(self, args):
+ opts, args = self.op.parse_args(args)
+ if len(args) != len(self.arg_names)+1:
self.op.print_help()
raise Exception("Wrong number of arguments")
- broker = opts.broker or "localhost:5672"
- connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1})
+ conn_options = {}
+ if opts.sasl_mechanism:
+ conn_options['sasl_mechanisms'] = opts.sasl_mechanism
+ if opts.ssl_certificate:
+ conn_options['ssl_certfile'] = opts.ssl_certificate
+ if opts.ssl_key:
+ conn_options['ssl_key'] = opts.ssl_key
+ conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+
+ connection = Connection.establish(opts.broker, **conn_options)
qmf_broker = BrokerAgent(connection)
ha_broker = qmf_broker.getHaBroker()
- if not ha_broker: raise Exception("HA module is not loaded on broker at %s"%broker)
- try: return self.do_execute(qmf_broker, ha_broker, opts, args)
+ if not ha_broker: raise Exception("HA module is not loaded on broker at %s" % opts.broker)
+ try: self.do_execute(qmf_broker, ha_broker, opts, args)
finally: connection.close()
def do_execute(self, qmf_broker, opts, args):
@@ -75,10 +89,10 @@ class StatusCmd(Command):
help="Don't print status but return 0 if it matches <status>, 1 otherwise")
def do_execute(self, qmf_broker, ha_broker, opts, args):
if opts.expect:
- if opts.expect != ha_broker.status: return 1
+ if opts.expect != ha_broker.status: raise ExitStatus(1)
else:
print ha_broker.status
- return 0
+
StatusCmd()
class ReplicateCmd(Command):
@@ -93,30 +107,31 @@ class SetCmd(Command):
Command.__init__(self, "set", "Set HA configuration settings")
def add(optname, metavar, type, help):
self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store")
- add("--brokers", "<url>", "string", "HA brokers use <url> to connect to each other")
- add("--public-brokers", "<url>", "string", "Clients use <url> to connect to HA brokers")
+ add("--brokers-url", "<url>", "string", "URL with address of each broker in the cluster. Used by brokers to connect to each other.")
+ add("--public-url", "<url>", "string", "URL advertised to clients to connect to the cluster. May be a list or a VIP.")
add("--backups", "<n>", "int", "Expect <n> backups to be running"),
def do_execute(self, qmf_broker, ha_broker, opts, args):
- if (opts.brokers): qmf_broker._method("setBrokers", {"url":opts.brokers}, HA_BROKER)
- if (opts.public_brokers): qmf_broker._method("setPublicBrokers", {"url":opts.public_brokers}, HA_BROKER)
+ if (opts.brokers_url): qmf_broker._method("setBrokersUrl", {"url":opts.brokers_url}, HA_BROKER)
+ if (opts.public_url): qmf_broker._method("setPublicUrl", {"url":opts.public_url}, HA_BROKER)
if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups}, HA_BROKER)
SetCmd()
class QueryCmd(Command):
def __init__(self):
- Command.__init__(self, "query", "Print HA configuration settings")
+ Command.__init__(self, "query", "Print HA configuration and status")
def do_execute(self, qmf_broker, ha_broker, opts, args):
hb = ha_broker
for x in [("Status:", hb.status),
- ("Brokers URL:", hb.brokers),
- ("Public URL:", hb.publicBrokers),
+ ("Brokers URL:", hb.brokersUrl),
+ ("Public URL:", hb.publicUrl),
("Expected Backups:", hb.expectedBackups),
("Replicate: ", hb.replicateDefault)
]:
print "%-20s %s"%(x[0], x[1])
+
QueryCmd()
def print_usage(prog):
@@ -133,18 +148,25 @@ def find_command(args):
return Command.commands[arg]
return None
-def main(argv):
- try:
- args=argv[1:]
- if args and args[0] == "--help-all":
- for c in Command.commands.itervalues():
- c.op.print_help(); print
- return 1
+def main_except(argv):
+ """This version of main raises exceptions"""
+ args=argv[1:]
+ if args and args[0] == "--help-all":
+ for c in Command.commands.itervalues():
+ c.op.print_help(); print
+ else:
command = find_command(args)
if not command:
print_usage(os.path.basename(argv[0]));
- return 1;
- if command.execute(): return 1
+ raise Exception("Command not found")
+ command.execute(args)
+
+def main(argv):
+ try:
+ main_except(argv)
+ return 0
+ except ExitStatus, e:
+ return e.status
except Exception, e:
print e
return 1
diff --git a/tools/src/py/qpid-printevents b/tools/src/py/qpid-printevents
index d56d2899b1..0d0f1a0782 100755
--- a/tools/src/py/qpid-printevents
+++ b/tools/src/py/qpid-printevents
@@ -21,34 +21,84 @@
import os
import optparse
-from optparse import IndentedHelpFormatter
import sys
-import socket
-from time import time, strftime, gmtime, sleep
-from qmf.console import Console, Session
+from optparse import IndentedHelpFormatter
+from time import time, strftime, gmtime, sleep
+from threading import Lock, Condition, Thread
+from qpid.messaging import Connection
+import qpid.messaging.exceptions
+home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
-class EventConsole(Console):
- def event(self, broker, event):
- print event
- sys.stdout.flush()
+from qpidtoollibs.broker import EventHelper
- def brokerConnected(self, broker):
- print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s" % broker.getUrl()
- sys.stdout.flush()
- def brokerConnectionFailed(self, broker):
- print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnectionFailed broker=%s %s" % (broker.getUrl(), str(broker.conn_exc))
- sys.stdout.flush()
+class Printer(object):
+ """
+ This class serializes printed lines so that events coming from different
+ threads don't overlap each other.
+ """
+ def __init__(self):
+ self.lock = Lock()
- def brokerDisconnected(self, broker):
- print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl()
+ def pr(self, text):
+ self.lock.acquire()
+ try:
+ print text
+ finally:
+ self.lock.release()
sys.stdout.flush()
+
+
+class EventReceiver(Thread):
+ """
+ One instance of this class is created for each broker that is being monitored.
+ This class does not use the "reconnect" option because it needs to report as
+ events when the connection is established and when it's lost.
+ """
+ def __init__(self, printer, url, options):
+ Thread.__init__(self)
+ self.printer = printer
+ self.url = url
+ self.options = options
+ self.running = True
+ self.helper = EventHelper()
+
+ def cancel(self):
+ self.running = False
+
+ def run(self):
+ isOpen = False
+ while self.running:
+ try:
+ conn = Connection.establish(self.url, **options)
+ isOpen = True
+ self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerConnected broker=%s" % self.url)
+
+ sess = conn.session()
+ rx = sess.receiver(self.helper.eventAddress())
+
+ while self.running:
+ try:
+ msg = rx.fetch(1)
+ event = self.helper.event(msg)
+ self.printer.pr(event.__repr__())
+ sess.acknowledge()
+ except qpid.messaging.exceptions.Empty:
+ pass
+
+ except Exception, e:
+ if isOpen:
+ self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC qpid-printevents:brokerDisconnected broker=%s" % self.url)
+ isOpen = False
+ sleep(1)
+
class JHelpFormatter(IndentedHelpFormatter):
- """Format usage and description without stripping newlines from usage strings
"""
-
+ Format usage and description without stripping newlines from usage strings
+ """
def format_usage(self, usage):
return usage
@@ -82,21 +132,42 @@ def main(argv=None):
p = optparse.OptionParser(usage=_usage, description=_description, formatter=JHelpFormatter())
p.add_option("--heartbeats", action="store_true", default=False, help="Use heartbeats.")
p.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ p.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ p.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+ p.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
options, arguments = p.parse_args(args=argv)
if len(arguments) == 0:
arguments.append("localhost")
- console = EventConsole()
- session = Session(console, rcvObjects=False, rcvHeartbeats=options.heartbeats, manageConnections=True)
- brokers = []
+ brokers = []
+ conn_options = {}
+ props = {}
+ printer = Printer()
+
+ if options.sasl_mechanism:
+ conn_options['sasl_mechanisms'] = options.sasl_mechanism
+ if options.ssl_certificate:
+ conn_options['ssl_certfile'] = options.ssl_certificate
+ if options.ssl_key:
+ conn_options['ssl_key'] = options.ssl_key
+ if options.ha_admin:
+ props['qpid.ha-admin'] = 1
+ if options.heartbeats:
+ props['heartbeat'] = 5
+
+ if len(props) > 0:
+ conn_options['client_properties'] = props
+
try:
try:
for host in arguments:
- brokers.append(session.addBroker(host, None, options.sasl_mechanism))
+ er = EventReceiver(printer, host, conn_options)
+ brokers.append(er)
+ er.start()
- while (True):
- sleep(10)
+ while (True):
+ sleep(10)
except KeyboardInterrupt:
print
@@ -106,9 +177,10 @@ def main(argv=None):
print "Failed: %s - %s" % (e.__class__.__name__, e)
return 1
finally:
- while len(brokers):
- b = brokers.pop()
- session.delBroker(b)
+ for b in brokers:
+ b.cancel()
+ for b in brokers:
+ b.join()
if __name__ == '__main__':
sys.exit(main())
diff --git a/tools/src/py/qpid-queue-stats b/tools/src/py/qpid-queue-stats
index 562ccce32d..f68609aed8 100755
--- a/tools/src/py/qpid-queue-stats
+++ b/tools/src/py/qpid-queue-stats
@@ -32,13 +32,13 @@ from qpid.connection import Connection, ConnectionFailed
from time import sleep
class BrokerManager(Console):
- def __init__(self, host, mechanism):
+ def __init__(self, host, conn_options):
self.url = host
self.objects = {}
self.filter = None
self.session = Session(self, rcvEvents=False, rcvHeartbeats=False,
userBindings=True, manageConnections=True)
- self.broker = self.session.addBroker(self.url, None, mechanism)
+ self.broker = self.session.addBroker(self.url, **conn_options)
self.firstError = True
def setFilter(self,filter):
@@ -126,17 +126,23 @@ def main(argv=None):
p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: [username/password@] hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost')
p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show')
p.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
-
+ p.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
options, arguments = p.parse_args(args=argv)
+ conn_options = {}
+ if options.sasl_mechanism:
+ conn_options['mechanisms'] = options.sasl_mechanism
+ if options.ssl_certificate:
+ conn_options['ssl_certfile'] = options.ssl_certificate
+
host = options.broker_address
filter = []
if options.filter != None:
for s in options.filter.split(","):
filter.append(re.compile(s))
- bm = BrokerManager(host, options.sasl_mechanism)
+ bm = BrokerManager(host, conn_options)
bm.setFilter(filter)
bm.Display()
diff --git a/tools/src/py/qpid-route b/tools/src/py/qpid-route
index 0316c24322..00c7c59189 100755
--- a/tools/src/py/qpid-route
+++ b/tools/src/py/qpid-route
@@ -53,16 +53,15 @@ def Usage():
class Config:
def __init__(self):
- self._verbose = False
- self._quiet = False
- self._durable = False
- self._dellink = False
- self._srclocal = False
- self._transport = "tcp"
- self._ack = 0
- self._connTimeout = 10
- self._client_sasl_mechanism = None
- self._ha_admin = False
+ self._verbose = False
+ self._quiet = False
+ self._durable = False
+ self._dellink = False
+ self._srclocal = False
+ self._transport = "tcp"
+ self._ack = 0
+ self._connTimeout = 10
+ self._conn_options = {}
config = Config()
@@ -97,6 +96,7 @@ def OptionsAndArguments(argv):
parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp")
parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.")
+ parser.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
opts, encArgs = parser.parse_args(args=argv)
@@ -130,13 +130,16 @@ def OptionsAndArguments(argv):
config._transport = opts.transport
if opts.ha_admin:
- config._ha_admin = True
+ config._conn_options['client_properties'] = {'qpid.ha-admin' : 1}
if opts.ack:
config._ack = opts.ack
if opts.client_sasl_mechanism:
- config._client_sasl_mechanism = opts.client_sasl_mechanism
+ config._conn_options['mechanisms'] = opts.client_sasl_mechanism
+
+ if opts.ssl_certificate:
+ config._conn_options['ssl_certfile'] = opts.ssl_certificate
return args
@@ -147,9 +150,7 @@ class RouteManager:
self.local = BrokerURL(localBroker)
self.remote = None
self.qmf = Session()
- client_properties = {}
- if config._ha_admin: client_properties["qpid.ha-admin"] = 1
- self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism, client_properties=client_properties)
+ self.broker = self.qmf.addBroker(localBroker, config._connTimeout, **config._conn_options)
self.broker._waitForStable()
self.agent = self.broker.getBrokerAgent()
diff --git a/tools/src/py/qpid-stat b/tools/src/py/qpid-stat
index 5a816baf6e..458ae36182 100755
--- a/tools/src/py/qpid-stat
+++ b/tools/src/py/qpid-stat
@@ -42,17 +42,26 @@ class Config:
self._limit = 50
self._increasing = False
self._sortcol = None
- self._sasl_mechanism = None
- self._ha_admin = False
config = Config()
+conn_options = {}
def OptionsAndArguments(argv):
""" Set global variables for options, return arguments """
global config
+ global conn_options
- parser = OptionParser(usage="usage: %prog [options] -[gcequm] [object-name]")
+ usage = \
+"""%prog -g [options]
+ %prog -c [options]
+ %prog -e [options]
+ %prog -q [options] [queue-name]
+ %prog -u [options]
+ %prog -m [options]
+ %prog --acl [options]"""
+
+ parser = OptionParser(usage=usage)
group1 = OptionGroup(parser, "General Options")
group1.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="<url>",
@@ -61,10 +70,12 @@ def OptionsAndArguments(argv):
help="Maximum time to wait for broker connection (in seconds)")
group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>",
help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ group1.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ group1.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
parser.add_option_group(group1)
- group2 = OptionGroup(parser, "Display Options")
+ group2 = OptionGroup(parser, "Command Options")
group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show")
group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show")
group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show")
@@ -72,12 +83,14 @@ def OptionsAndArguments(argv):
group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show")
group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show")
group2.add_option( "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show")
- group2.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name")
- group2.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)")
- group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows")
-
parser.add_option_group(group2)
+ group3 = OptionGroup(parser, "Display Options")
+ group3.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name")
+ group3.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)")
+ group3.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows")
+ parser.add_option_group(group3)
+
opts, args = parser.parse_args(args=argv)
if not opts.show:
@@ -89,8 +102,15 @@ def OptionsAndArguments(argv):
config._connTimeout = opts.timeout
config._increasing = opts.increasing
config._limit = opts.limit
- config._sasl_mechanism = opts.sasl_mechanism
- config._ha_admin = opts.ha_admin
+
+ if opts.sasl_mechanism:
+ conn_options['sasl_mechanisms'] = opts.sasl_mechanism
+ if opts.ssl_certificate:
+ conn_options['ssl_certfile'] = opts.ssl_certificate
+ if opts.ssl_key:
+ conn_options['ssl_key'] = opts.ssl_key
+ if opts.ha_admin:
+ conn_options['client_properties'] = {'qpid.ha-admin' : 1}
return args
@@ -126,11 +146,9 @@ class BrokerManager:
self.broker = None
self.cluster = None
- def SetBroker(self, brokerUrl, mechanism):
+ def SetBroker(self, brokerUrl):
self.url = brokerUrl
- client_properties={}
- if config._ha_admin: client_properties["qpid.ha-admin"] = 1
- self.connection = Connection.establish(self.url, sasl_mechanisms=mechanism, client_properties=client_properties)
+ self.connection = Connection.establish(self.url, **conn_options)
self.broker = BrokerAgent(self.connection)
def Disconnect(self):
@@ -235,9 +253,10 @@ class BrokerManager:
def displayConn(self):
disp = Display(prefix=" ")
heads = []
- heads.append(Header('client-addr'))
+ heads.append(Header('connection'))
heads.append(Header('cproc'))
heads.append(Header('cpid'))
+ heads.append(Header('mech'))
heads.append(Header('auth'))
heads.append(Header('connected', Header.DURATION))
heads.append(Header('idle', Header.DURATION))
@@ -251,6 +270,7 @@ class BrokerManager:
row.append(conn.address)
row.append(conn.remoteProcessName)
row.append(conn.remotePid)
+ row.append(conn.saslMechanism)
row.append(conn.authIdentity)
row.append(broker.getUpdateTime() - conn.getCreateTime())
row.append(broker.getUpdateTime() - conn.getUpdateTime())
@@ -416,7 +436,8 @@ class BrokerManager:
heads.append(Header("acked", Header.Y))
heads.append(Header("excl", Header.Y))
heads.append(Header("creditMode"))
- heads.append(Header("delivered", Header.KMG))
+ heads.append(Header("delivered", Header.COMMAS))
+ heads.append(Header("sessUnacked", Header.COMMAS))
rows = []
subscriptions = self.broker.getAllSubscriptions()
sessions = self.getSessionMap()
@@ -436,6 +457,7 @@ class BrokerManager:
row.append(s.exclusive)
row.append(s.creditMode)
row.append(s.delivered)
+ row.append(session.unackedMessages)
rows.append(row)
except:
pass
@@ -524,7 +546,7 @@ def main(argv=None):
bm = BrokerManager()
try:
- bm.SetBroker(config._host, config._sasl_mechanism)
+ bm.SetBroker(config._host)
bm.display(args)
bm.Disconnect()
return 0
diff --git a/tools/src/py/qpid-tool b/tools/src/py/qpid-tool
index af948b13a9..4afa18dbb1 100755
--- a/tools/src/py/qpid-tool
+++ b/tools/src/py/qpid-tool
@@ -173,11 +173,11 @@ class Mcli(Cmd):
class QmfData(Console):
"""
"""
- def __init__(self, disp, url):
+ def __init__(self, disp, url, cert):
self.disp = disp
self.url = url
self.session = Session(self, manageConnections=True)
- self.broker = self.session.addBroker(self.url)
+ self.broker = self.session.addBroker(self.url, ssl_certfile=cert)
self.lock = Lock()
self.connected = None
self.closing = None
@@ -455,6 +455,7 @@ class QmfData(Console):
rows.append(row)
else:
print "No object found with ID %d" % dispId
+ return
finally:
self.lock.release()
self.disp.table(caption, heads, rows)
@@ -723,10 +724,13 @@ if _host[0] == '-':
sys.exit(1)
disp = Display()
+cert = None
+if len(cargs) > 1:
+ cert = cargs[1]
# Attempt to make a connection to the target broker
try:
- data = QmfData(disp, _host)
+ data = QmfData(disp, _host, cert)
except Exception, e:
if str(e).find("Exchange not found") != -1:
print "Management not enabled on broker: Use '-m yes' option on broker startup."
diff --git a/tools/src/py/qpidtoollibs/broker.py b/tools/src/py/qpidtoollibs/broker.py
index 0bae786306..ea31aeabb0 100644
--- a/tools/src/py/qpidtoollibs/broker.py
+++ b/tools/src/py/qpidtoollibs/broker.py
@@ -18,6 +18,7 @@
#
from qpid.messaging import Message
+from qpidtoollibs.disp import TimeLong
try:
from uuid import uuid4
except ImportError:
@@ -190,9 +191,13 @@ class BrokerAgent(object):
def getAcl(self):
return self._getSingleObject(Acl)
- def echo(self, sequence, body):
+ def getMemory(self):
+ return self._getSingleObject(Memory)
+
+ def echo(self, sequence = 1, body = "Body"):
"""Request a response to test the path to the management broker"""
- pass
+ args = {'sequence' : sequence, 'body' : body}
+ return self._method('echo', args)
def connect(self, host, port, durable, authMechanism, username, password, transport):
"""Establish a connection to another broker"""
@@ -295,6 +300,41 @@ class BrokerAgent(object):
return self._getBrokerObject(self, _type, oid)
+class EventHelper(object):
+ def eventAddress(self, pkg='*', cls='*', sev='*'):
+ return "qmf.default.topic/agent.ind.event.%s.%s.%s.#" % (pkg.replace('.', '_'), cls, sev)
+
+ def event(self, msg):
+ return BrokerEvent(msg)
+
+
+class BrokerEvent(object):
+ def __init__(self, msg):
+ self.msg = msg
+ self.content = msg.content[0]
+ self.values = self.content['_values']
+ self.schema_id = self.content['_schema_id']
+ self.name = "%s:%s" % (self.schema_id['_package_name'], self.schema_id['_class_name'])
+
+ def __repr__(self):
+ rep = "%s %s" % (TimeLong(self.getTimestamp()), self.name)
+ for k,v in self.values.items():
+ rep = rep + " %s=%s" % (k, v)
+ return rep
+
+ def __getattr__(self, key):
+ if key not in self.values:
+ return None
+ value = self.values[key]
+ return value
+
+ def getAttributes(self):
+ return self.values
+
+ def getTimestamp(self):
+ return self.content['_timestamp']
+
+
class BrokerObject(object):
def __init__(self, broker, content):
self.broker = broker
@@ -362,7 +402,7 @@ class Connection(BrokerObject):
BrokerObject.__init__(self, broker, values)
def close(self):
- pass
+ self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address)
class Session(BrokerObject):
def __init__(self, broker, values):
diff --git a/tools/src/py/qpidtoollibs/disp.py b/tools/src/py/qpidtoollibs/disp.py
index a0c77370a5..21e09f8d00 100644
--- a/tools/src/py/qpidtoollibs/disp.py
+++ b/tools/src/py/qpidtoollibs/disp.py
@@ -167,7 +167,10 @@ class Display:
for head in heads:
width = len (head)
for row in rows:
- cellWidth = len (unicode (row[col]))
+ text = row[col]
+ if text.__class__ == str:
+ text = text.decode('utf-8')
+ cellWidth = len(unicode(text))
if cellWidth > width:
width = cellWidth
colWidth.append (width + self.tableSpacing)
@@ -187,9 +190,12 @@ class Display:
line = self.tablePrefix
col = 0
for width in colWidth:
- line = line + unicode (row[col])
+ text = row[col]
+ if text.__class__ == str:
+ text = text.decode('utf-8')
+ line = line + unicode(text)
if col < len (heads) - 1:
- for i in range (width - len (unicode (row[col]))):
+ for i in range (width - len(unicode(text))):
line = line + " "
col = col + 1
print line