diff options
Diffstat (limited to 'qpid/tools/src/java/qpid-qmf2-tools/src/main/java')
7 files changed, 3680 insertions, 0 deletions
diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionAudit.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionAudit.java new file mode 100644 index 0000000000..7f6109cb7a --- /dev/null +++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionAudit.java @@ -0,0 +1,474 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf2.tools; + +// JMS Imports +import javax.jms.Connection; + +// Misc Imports +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStreamReader; +import java.io.IOException; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +// For DOM parsing the whitelist +import org.w3c.dom.*; +import javax.xml.parsers.*; + +// QMF2 Imports +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.QmfEventListener; +import org.apache.qpid.qmf2.common.QmfException; +import org.apache.qpid.qmf2.common.WorkItem; +import org.apache.qpid.qmf2.console.Agent; +import org.apache.qpid.qmf2.console.AgentRestartedWorkItem; +import org.apache.qpid.qmf2.console.Console; +import org.apache.qpid.qmf2.console.EventReceivedWorkItem; +import org.apache.qpid.qmf2.console.QmfConsoleData; +import org.apache.qpid.qmf2.util.ConnectionHelper; +import org.apache.qpid.qmf2.util.GetOpt; + +/** + * Audits connections to one or more Qpid message brokers. + * <pre> + * Exchange and Queue names are checked against a whitelist and if no match is found an alert is generated. + * + * If no broker-addr is supplied, ConnectionAudit connects to 'localhost:5672'. + * + * [broker-addr] syntax: + * + * [username/password@] hostname + * ip-address [:<port>] + * + * Examples: + * + * $ ConnectionAudit localhost:5672 + * $ ConnectionAudit 10.1.1.7:10000 + * $ ConnectionAudit guest/guest@broker-host:10000 + * + * Options: + * -h, --help show this help message and exit + * --sasl-mechanism=<mech> + * SASL mechanism for authentication (e.g. EXTERNAL, + * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL + * automatically picks the most secure available + * mechanism - use this option to override. + * --whitelist=<whitelist XML document> + * The fully qualified name of the whitelist XML file, + * default is ./whitelist.xml + * + * </pre> + * An example whitelist is illustrated below, note that in this example the exchanges associated with management + * have been whitelisted to remove spurious alerts caused by the temporary management queues. + * <pre> + *<?xml version="1.0" encoding="UTF-8"?> + *<whitelist> + * <exchangeWhitelist> + * <exchange>qmf.default.topic</exchange> + * <exchange>qmf.default.direct</exchange> + * <exchange>qpid.management</exchange> + * <exchange>amq.direct</exchange> + * <exchange></exchange> + * </exchangeWhitelist> + * <queueWhitelist> + * <queue>testqueue</queue> + * </queueWhitelist> + *</whitelist> + * </pre> + + * @author Fraser Adams + */ +public final class ConnectionAudit implements QmfEventListener +{ + private static final String _usage = + "Usage: ConnectionAudit [options] [broker-addr]...\n"; + + private static final String _description = + "Audits connections to one or more Qpid message brokers.\n" + + "Exchange and Queue names are checked against a whitelist and if no match is found an alert is generated.\n" + + "\n" + + "If no broker-addr is supplied, ConnectionAudit connects to 'localhost:5672'.\n" + + "\n" + + "[broker-addr] syntax:\n" + + "\n" + + "[username/password@] hostname\n" + + "ip-address [:<port>]\n" + + "\n" + + "Examples:\n" + + "\n" + + "$ ConnectionAudit localhost:5672\n" + + "$ ConnectionAudit 10.1.1.7:10000\n" + + "$ ConnectionAudit guest/guest@broker-host:10000\n"; + + private static final String _options = + "Options:\n" + + " -h, --help show this help message and exit\n" + + " --sasl-mechanism=<mech>\n" + + " SASL mechanism for authentication (e.g. EXTERNAL,\n" + + " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" + + " automatically picks the most secure available\n" + + " mechanism - use this option to override.\n" + + " --whitelist=<whitelist XML document>\n" + + " The fully qualified name of the whitelist XML file,\n" + + " default is ./whitelist.xml\n"; + + + private final String _url; + private final String _whitelist; + private long _whitelistLastModified = 0; + private Console _console; + + // The sets to be used as the whitelists. + private Set<String> _exchangeWhitelist = new HashSet<String>(); + private Set<String> _queueWhitelist = new HashSet<String>(); + + /** + * Basic constructor. Creates JMS Session, Initialises Destinations, Producers & Consumers and starts connection. + * @param url the connection URL. + * @param connectionOptions the options String to pass to ConnectionHelper. + * @param whitelist the path name of the whitelist XML file. + */ + public ConnectionAudit(final String url, final String connectionOptions, final String whitelist) + { + System.out.println("Connecting to " + url); + _url = url; + _whitelist = whitelist; + try + { + Connection connection = ConnectionHelper.createConnection(url, connectionOptions); + _console = new Console(this); + _console.addConnection(connection); + checkExistingSubscriptions(); + } + catch (QmfException qmfe) + { + System.err.println ("QmfException " + qmfe.getMessage() + " caught in ConnectionAudit constructor"); + } + } + + /** + * When we start up we need to check any subscriptions that already exist against the whitelist. + * Subsequent checks are made only when we receive new subscribe events. + */ + private void checkExistingSubscriptions() + { + readWhitelist(); + List<QmfConsoleData> subscriptions = _console.getObjects("org.apache.qpid.broker", "subscription"); + for (QmfConsoleData subscription : subscriptions) + { + QmfConsoleData queue = dereference(subscription.getRefValue("queueRef")); + QmfConsoleData session = dereference(subscription.getRefValue("sessionRef")); + QmfConsoleData connection = dereference(session.getRefValue("connectionRef")); + + String queueName = queue.getStringValue("name"); + String address = connection.getStringValue("address"); + String timestamp = new Date(subscription.getCreateTime()/1000000l).toString(); + validateQueue(queueName, address, timestamp); + } + } + + /** + * Dereferences an ObjectId returning a QmfConsoleData. + * @param ref the ObjectId to be dereferenced. + * @return the dereferenced QmfConsoleData object or null if the object can't be found. + */ + private QmfConsoleData dereference(final ObjectId ref) + { + List<QmfConsoleData> data = _console.getObjects(ref); + if (data.size() == 1) + { + return data.get(0); + } + return null; + } + + /** + * Looks up the exchange and binding information from the supplied queuename then calls the main validateQueue() + * @param queueName the name of the queue that we want to check against the whitelists. + * @param exchangeName the name of the exchange that the queue we want to check against the whitelists is bound to. + * @param binding the binding associating queue "queueName" with exchange "exchangeName". + * @param address the connection address information for the subscription. + * @param timestamp the timestamp of the subscription. + */ + private void validateQueue(final String queueName, String exchangeName, final QmfConsoleData binding, + final String address, final String timestamp) + { + if (_exchangeWhitelist.contains(exchangeName)) + { // Check exchangeName against the exchangeWhitelist and if it's in there we simply return. + return; + } + + if (_queueWhitelist.contains(queueName)) + { // Check queueName against the queueWhitelist and if it's in there we simply return. + return; + } + + if (exchangeName.equals("")) + { // Make exchangeName render more prettily if necessary. + exchangeName = "''"; + } + + String bindingKey = binding.getStringValue("bindingKey"); + Map arguments = (Map)binding.getValue("arguments"); + if (arguments.isEmpty()) + { + System.out.printf("%s ALERT ConnectionAudit.validateQueue() validation failed for queue: %s with binding[%s] => %s from address: %s with connection timestamp %s\n\n", new Date().toString(), queueName, bindingKey, exchangeName, address, timestamp); + } + else + { // If there are binding arguments then it's a headers exchange so display accordimgly. + System.out.printf("%s ALERT ConnectionAudit.validateQueue() validation failed for queue: %s with binding[%s] => %s %s from address: %s with connection timestamp %s\n\n", new Date().toString(), queueName, bindingKey, exchangeName, arguments, address, timestamp); + } + } + + /** + * Looks up the exchange and binding information from the supplied queuename then calls the main validateQueue() + * @param queueName the name of the queue that we want to check against the whitelists. + * @param address the connection address information for the subscription. + * @param timestamp the timestamp of the subscription. + */ + private void validateQueue(final String queueName, final String address, final String timestamp) + { + ObjectId queueId = null; + List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue"); + for (QmfConsoleData queue : queues) + { // We first have to find the ObjectId of the queue called queueName. + if (queue.getStringValue("name").equals(queueName)) + { + queueId = queue.getObjectId(); + break; + } + } + + if (queueId == null) + { + System.out.printf("%s ERROR ConnectionAudit.validateQueue() %s reference couldn't be found\n", + new Date().toString(), queueName); + } + else + { // If we've got the queue's ObjectId we then find the binding that references it. + List<QmfConsoleData> bindings = _console.getObjects("org.apache.qpid.broker", "binding"); + for (QmfConsoleData binding : bindings) + { + ObjectId queueRef = binding.getRefValue("queueRef"); + if (queueRef.equals(queueId)) + { // We've found a binding that matches queue queueName so look up the associated exchange and validate. + QmfConsoleData exchange = dereference(binding.getRefValue("exchangeRef")); + String exchangeName = exchange.getStringValue("name"); + validateQueue(queueName, exchangeName, binding, address, timestamp); + } + } + } + } + + /** + * Handles WorkItems delivered by the Console. + * <p> + * If we receive an EventReceivedWorkItem check if it is a subscribe event. If it is we check if the whitelist has + * changed, and if it has we re-read it. We then extract the queue name, exchange name, binding, connection address + * and timestamp and validate with the whitelsist. + * <p> + * If we receive an AgentRestartedWorkItem we revalidate all subscriptions as it's possible that a client connection + * could have been made to the broker before ConnectionAudit has successfully re-established its own connections. + * @param wi a QMF2 WorkItem object + */ + public void onEvent(final WorkItem wi) + { + if (wi instanceof EventReceivedWorkItem) + { + EventReceivedWorkItem item = (EventReceivedWorkItem)wi; + QmfEvent event = item.getEvent(); + String className = event.getSchemaClassId().getClassName(); + if (className.equals("subscribe")) + { + readWhitelist(); + String queueName = event.getStringValue("qName"); + String address = event.getStringValue("rhost"); + String timestamp = new Date(event.getTimestamp()/1000000l).toString(); + validateQueue(queueName, address, timestamp); + } + } + else if (wi instanceof AgentRestartedWorkItem) + { + checkExistingSubscriptions(); + } + } + + /** + * This method first checks if the whitelist file exists, if not it clears the sets used as whitelists + * so that no whitelisting is applied. If the whitelist file does exist it is parsed by a DOM parser. + * <p> + * We look for all exchange and queue elements and populate the respective whitelist sets with their + * contents. Note that we check the whitelist file update time to avoid reading it if it hasn't been changed + */ + private void readWhitelist() + { + File file = new File(_whitelist); + if (file.exists()) + { + long mtime = file.lastModified(); + if (mtime != _whitelistLastModified) + { + _whitelistLastModified = mtime; + _exchangeWhitelist.clear(); + _queueWhitelist.clear(); + + try + { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilder docBuilder = factory.newDocumentBuilder(); + Document doc = docBuilder.parse(file); + + Element whitelist = doc.getDocumentElement(); + if (whitelist.getNodeName().equals("whitelist")) + { + NodeList children = whitelist.getChildNodes(); + for (int i = 0; i < children.getLength(); i++) + { + Node child = children.item(i); + if (child.getNodeName().equals("exchangeWhitelist")) + { + NodeList exchanges = child.getChildNodes(); + for (int j = 0; j < exchanges.getLength(); j++) + { + Node node = exchanges.item(j); + if (node.getNodeName().equals("exchange")) + { + if (node.hasChildNodes()) + { + String exchange = node.getFirstChild().getNodeValue(); + _exchangeWhitelist.add(exchange); + } + else + { + _exchangeWhitelist.add(""); + } + } + } + } + else if (child.getNodeName().equals("queueWhitelist")) + { + NodeList queues = child.getChildNodes(); + for (int j = 0; j < queues.getLength(); j++) + { + Node node = queues.item(j); + if (node.getNodeName().equals("queue")) + { + if (node.hasChildNodes()) + { + String queue = node.getFirstChild().getNodeValue(); + _queueWhitelist.add(queue); + } + } + } + } + } + } + } + catch (Exception e) + { // Failed to parse correctly. + System.out.println("Exception " + e + " while reading " + _whitelist); + System.out.println(new Date().toString() + " WARN ConnectionAudit.readWhitelist() " + + _whitelist + " failed: " + e.getMessage()); + return; + } + } + } + else + { // If whitelist file doesn't exist log a warning and clear the whitelists. + System.out.println(new Date().toString() + " WARN ConnectionAudit.readWhitelist() " + + _whitelist + " doesn't exist"); + _exchangeWhitelist.clear(); + _queueWhitelist.clear(); + } + } // End of readWhitelist() + + /** + * Runs ConnectionAudit. + * @param args the command line arguments. + */ + public static void main(final String[] args) + { + String logLevel = System.getProperty("amqj.logging.level"); + logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG. + System.setProperty("amqj.logging.level", logLevel); + + String[] longOpts = {"help", "whitelist=", "sasl-mechanism="}; + try + { + String connectionOptions = "{reconnect: true}"; + String whitelist = "./whitelist.xml"; + GetOpt getopt = new GetOpt(args, "h", longOpts); + List<String[]> optList = getopt.getOptList(); + String[] cargs = {}; + cargs = getopt.getEncArgs().toArray(cargs); + for (String[] opt : optList) + { + if (opt[0].equals("-h") || opt[0].equals("--help")) + { + System.out.println(_usage); + System.out.println(_description); + System.out.println(_options); + System.exit(1); + } + else if (opt[0].equals("--whitelist")) + { + whitelist = opt[1]; + } + else if (opt[0].equals("--sasl-mechanism")) + { + connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}"; + } + } + + int nargs = cargs.length; + if (nargs == 0) + { + cargs = new String[] {"localhost"}; + } + + for (String url : cargs) + { + ConnectionAudit eventPrinter = new ConnectionAudit(url, connectionOptions, whitelist); + } + } + catch (IllegalArgumentException e) + { + System.out.println(_usage); + System.exit(1); + } + + try + { // Block here + Thread.currentThread().join(); + } + catch (InterruptedException ie) + { + } + } +} diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionLogger.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionLogger.java new file mode 100644 index 0000000000..f9364b4069 --- /dev/null +++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionLogger.java @@ -0,0 +1,383 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf2.tools; + +// JMS Imports +import javax.jms.Connection; + +// Misc Imports +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.Map; + +// QMF2 Imports +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.QmfEventListener; +import org.apache.qpid.qmf2.common.QmfException; +import org.apache.qpid.qmf2.common.WorkItem; +import org.apache.qpid.qmf2.console.Agent; +import org.apache.qpid.qmf2.console.AgentHeartbeatWorkItem; +import org.apache.qpid.qmf2.console.AgentRestartedWorkItem; +import org.apache.qpid.qmf2.console.Console; +import org.apache.qpid.qmf2.console.EventReceivedWorkItem; +import org.apache.qpid.qmf2.console.QmfConsoleData; +import org.apache.qpid.qmf2.util.ConnectionHelper; +import org.apache.qpid.qmf2.util.GetOpt; + +/** + * ConnectionLogger is a QMF2 class used to provide information about connections made to a broker. + * <p> + * In default mode ConnectionLogger lists the connections made to a broker along with information about sessions + * such as whether any subscriptions are associated with the session (if a session has no subscriptions then it's + * quite likely to be a "producer only" session, so this knowledge is quite useful). + * <p> + * In "log queue and binding" mode the information provided is very similar to qpid-config -b queues but with + * additional connection related information provided as with default mode. + * + * <pre> + * Usage: ConnectionLogger [options] + * + * Options: + * -h, --help show this help message and exit + * -q log queue and binding information for consumer connection + * -a <address>, --broker-address=<address> + * broker-addr is in the form: [username/password@] + * hostname | ip-address [:<port>] ex: localhost, + * 10.1.1.7:10000, broker-host:10000, + * guest/guest@localhost + * --sasl-mechanism=<mech> + * SASL mechanism for authentication (e.g. EXTERNAL, + * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL + * automatically picks the most secure available + * mechanism - use this option to override. + * </pre> + * @author Fraser Adams + */ +public final class ConnectionLogger implements QmfEventListener +{ + private static final String _usage = + "Usage: ConnectionLogger [options]\n"; + + private static final String _options = + "Options:\n" + + " -h, --help show this help message and exit\n" + + " -q log queue and binding information for consumer connections\n" + + " -a <address>, --broker-address=<address>\n" + + " broker-addr is in the form: [username/password@]\n" + + " hostname | ip-address [:<port>] ex: localhost,\n" + + " 10.1.1.7:10000, broker-host:10000,\n" + + " guest/guest@localhost\n" + + " --sasl-mechanism=<mech>\n" + + " SASL mechanism for authentication (e.g. EXTERNAL,\n" + + " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" + + " automatically picks the most secure available\n" + + " mechanism - use this option to override.\n"; + + private Console _console; + private boolean _logQueues; + + // If any queues get added or deleted we set this to flag that we need to re-log connections on next heartbeat. + private boolean _stateChanged = false; + + /** + * Basic constructor. Creates JMS Session, Initialises Destinations, Producers & Consumers and starts connection. + * @param url the connection URL. + * @param connectionOptions the options String to pass to ConnectionHelper. + * @param logQueues flags whether queue & binding information is logged as well as connection info. + */ + public ConnectionLogger(final String url, final String connectionOptions, final boolean logQueues) + { + try + { + Connection connection = ConnectionHelper.createConnection(url, connectionOptions); + _console = new Console(this); + _console.addConnection(connection); + _logQueues = logQueues; + System.out.println("Hit Return to exit"); + logConnectionInformation(); + } + catch (QmfException qmfe) + { + System.err.println ("QmfException " + qmfe.getMessage() + " caught in ConnectionLogger constructor"); + } + } + + /** + * Finds a QmfConsoleData in a List of QMF Objects that matches a given ObjectID + * + * More or less a direct Java port of findById from qpid-config + * + * @param items the List of QMF Objects to search + * @param id the ObjectId we're searching the List for + * @return return the found object as a QmfConsoleData else return null + */ + private QmfConsoleData findById(final List<QmfConsoleData> items, final ObjectId id) + { + for (QmfConsoleData item : items) + { + if (item.getObjectId().equals(id)) + { + return item; + } + } + + return null; + } + + /** + * For every queue list the bindings (equivalent of qpid-config -b queues) + * + * More or less a direct Java port of QueueListRecurse in qpid-config, which handles qpid-config -b queues + * + * @param ref If ref is null list info about all queues else list info about queue referenced by ObjectID + */ + private void logQueueInformation(final ObjectId ref) + { + List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue"); + List<QmfConsoleData> bindings = _console.getObjects("org.apache.qpid.broker", "binding"); + List<QmfConsoleData> exchanges = _console.getObjects("org.apache.qpid.broker", "exchange"); + + for (QmfConsoleData queue : queues) + { + ObjectId queueId = queue.getObjectId(); + + if (ref == null || ref.equals(queueId)) + { + System.out.printf(" Queue '%s'\n", queue.getStringValue("name")); + System.out.println(" arguments " + (Map)queue.getValue("arguments")); + + for (QmfConsoleData binding : bindings) + { + ObjectId queueRef = binding.getRefValue("queueRef"); + + if (queueRef.equals(queueId)) + { + ObjectId exchangeRef = binding.getRefValue("exchangeRef"); + QmfConsoleData exchange = findById(exchanges, exchangeRef); + + String exchangeName = "<unknown>"; + if (exchange != null) + { + exchangeName = exchange.getStringValue("name"); + if (exchangeName.equals("")) + { + exchangeName = "''"; + } + } + + String bindingKey = binding.getStringValue("bindingKey"); + Map arguments = (Map)binding.getValue("arguments"); + if (arguments.isEmpty()) + { + System.out.printf(" bind [%s] => %s\n", bindingKey, exchangeName); + } + else + { + // If there are binding arguments then it's a headers exchange + System.out.printf(" bind [%s] => %s %s\n", bindingKey, exchangeName, arguments); + } + } + } + } + } + } + + /** + * Logs audit information about each connection made to the broker + * + * Obtains connection, session and subscription objects and iterates in turn through these comparing + * references to find the subscriptions association with sessions and sessions associated with + * connections. Ultimately it then uses logQueueInformation to display the queues associated with + * each subscription. + */ + private void logConnectionInformation() + { + System.out.println("\n\n**** ConnectionLogger: Logging current connection information ****"); + + List<QmfConsoleData> connections = _console.getObjects("org.apache.qpid.broker", "connection"); + List<QmfConsoleData> sessions = _console.getObjects("org.apache.qpid.broker", "session"); + List<QmfConsoleData> subscriptions = _console.getObjects("org.apache.qpid.broker", "subscription"); + + for (QmfConsoleData connection : connections) + { + System.out.printf("\nConnection '%s'\n", connection.getStringValue("address")); + + String[] properties = {"authIdentity","remoteProcessName", "federationLink"}; + for (String p : properties) + { + System.out.println(p + ": " + connection.getStringValue(p)); + } + + System.out.println("createTimestamp: " + new Date(connection.getCreateTime()/1000000l)); + + ObjectId connectionId = connection.getObjectId(); + for (QmfConsoleData session : sessions) + { // Iterate through all session objects + ObjectId connectionRef = session.getRefValue("connectionRef"); + if (connectionRef.equals(connectionId)) + { // But only select sessions that are associated with the connection under consideration. + System.out.printf("Session '%s'\n", session.getStringValue("name")); + int subscriptionCount = 0; + ObjectId sessionId = session.getObjectId(); + for (QmfConsoleData subscription : subscriptions) + { // Iterate through all subscription objects + ObjectId sessionRef = subscription.getRefValue("sessionRef"); + if (sessionRef.equals(sessionId)) + { // But only select subscriptions that are associated with the session under consideration. + subscriptionCount++; + ObjectId queueRef = subscription.getRefValue("queueRef"); + if (_logQueues) + { + logQueueInformation(queueRef); + } + } + } + if (subscriptionCount == 0) + { + System.out.println(" ** No Subscriptions for this Session - probably a producer only Session **"); + } + } + } + } + } + + /** + * Listener for QMF2 WorkItems + * <p> + * This method looks for clientConnect or clientDisconnect Events and uses these as a trigger to log the new + * connection state when the next Heartbeat occurs. + * <p> + * There are a couple of reasons for using this approach rather than just calling logConnectionInformation() + * as soon as we see the clientConnect or clientDisconnect Event. + * <p> + * 1. We could potentially have lots of connection Events and redisplaying all of the connections for each + * Event is likely to be confusing. + * <p> + * 2. When a clientConnect Event occurs we don't have all of the informatin that we might need, for example this + * application checks the Session and Subscription information and also optionally Queue and Binding information. + * Creating Sessions/Subscriptions won't generally occur until some (possibly small, but possibly not) time + * after the Connection has been made. The approach taken here reduces spurious assertions that a Session is + * probably a "producer only" Session. As one of the use-cases for this tool is to attempt to flag up "producer + * only" Sessions we want to try and make it as reliable as possible. + * + * @param wi a QMF2 WorkItem object + */ + public void onEvent(final WorkItem wi) + { + if (wi instanceof EventReceivedWorkItem) + { + EventReceivedWorkItem item = (EventReceivedWorkItem)wi; + Agent agent = item.getAgent(); + QmfEvent event = item.getEvent(); + + String className = event.getSchemaClassId().getClassName(); + if (className.equals("clientConnect") || + className.equals("clientDisconnect")) + { + _stateChanged = true; + } + } + else if (wi instanceof AgentRestartedWorkItem) + { + _stateChanged = true; + } + else if (wi instanceof AgentHeartbeatWorkItem) + { + AgentHeartbeatWorkItem item = (AgentHeartbeatWorkItem)wi; + Agent agent = item.getAgent(); + + if (_stateChanged && agent.getName().contains("qpidd")) + { + logConnectionInformation(); + _stateChanged = false; + } + } + } + + /** + * Runs ConnectionLogger. + * @param args the command line arguments. + */ + public static void main(final String[] args) + { + String logLevel = System.getProperty("amqj.logging.level"); + logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG. + System.setProperty("amqj.logging.level", logLevel); + + String[] longOpts = {"help", "broker-address=", "sasl-mechanism="}; + try + { + String host = "localhost"; + String connectionOptions = "{reconnect: true}"; + boolean logQueues = false; + + GetOpt getopt = new GetOpt(args, "ha:q", longOpts); + List<String[]> optList = getopt.getOptList(); + String[] cargs = {}; + cargs = getopt.getEncArgs().toArray(cargs); + + for (String[] opt : optList) + { + if (opt[0].equals("-h") || opt[0].equals("--help")) + { + System.out.println(_usage); + System.out.println(_options); + System.exit(1); + } + else if (opt[0].equals("-a") || opt[0].equals("--broker-address")) + { + host = opt[1]; + } + else if (opt[0].equals("-q")) + { + logQueues = true; + } + else if (opt[0].equals("--sasl-mechanism")) + { + connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}"; + } + } + + ConnectionLogger logger = new ConnectionLogger(host, connectionOptions, logQueues); + } + catch (IllegalArgumentException e) + { + System.out.println(_usage); + System.out.println(e.getMessage()); + System.exit(1); + } + + BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in)); + try + { // Blocks here until return is pressed + String s = commandLine.readLine(); + System.exit(0); + } + catch (IOException e) + { + System.out.println ("ConnectionLogger main(): IOException: " + e.getMessage()); + } + } +} diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java new file mode 100644 index 0000000000..e4262b0577 --- /dev/null +++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java @@ -0,0 +1,1517 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf2.tools; + +import javax.jms.Connection; + +// Misc Imports +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStreamReader; +import java.io.IOException; + +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; + +// QMF2 Imports +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.QmfData; +import org.apache.qpid.qmf2.common.QmfException; +import org.apache.qpid.qmf2.console.Console; +import org.apache.qpid.qmf2.console.QmfConsoleData; +import org.apache.qpid.qmf2.util.ConnectionHelper; +import org.apache.qpid.qmf2.util.GetOpt; + +/** + * QpidConfig is a fairly "literal" Java port of the python qpid-config tool. + * <p> + * It's vaguely pointless, as the python qpid-config is the "canonical" qpid-config :-) + * Nonetheless, it's a useful intellectual exercise to illustrate using QMF2 from Java. + * <p> + * QpidConfig (unlike the python qpid-config) uses pure QMF2 for adding/deleting queues, exchanges & bindings + * this provides useful illustration of how to do these things using the ManagementAgent method calls. + * <p> + * N.B. "create" and "delete" broker ManagementAgent methods were added in Qpid version 0.10, unfortunately these + * calls won't work for earlier versions of Qpid. + * <pre> + * Usage: qpid-config [OPTIONS] + * qpid-config [OPTIONS] exchanges [filter-string] + * qpid-config [OPTIONS] queues [filter-string] + * qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions] + * qpid-config [OPTIONS] del exchange <name> + * qpid-config [OPTIONS] add queue <name> [AddQueueOptions] + * qpid-config [OPTIONS] del queue <name> [DelQueueOptions] + * qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key] + * <for type xml> [-f -|filename] + * <for type header> [all|any] k1=v1 [, k2=v2...] + * qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key] + * + * ADDRESS syntax: + * + * [username/password@] hostname + * ip-address [:<port>] + * + * Examples: + * + * $ qpid-config add queue q + * $ qpid-config add exchange direct d localhost:5672 + * $ qpid-config exchanges 10.1.1.7:10000 + * $ qpid-config queues guest/guest@broker-host:10000 + * + * Add Exchange <type> values: + * + * direct Direct exchange for point-to-point communication + * fanout Fanout exchange for broadcast communication + * topic Topic exchange that routes messages using binding keys with wildcards + * headers Headers exchange that matches header fields against the binding keys + * xml XML Exchange - allows content filtering using an XQuery + * + * + * Queue Limit Actions + * + * none (default) - Use broker's default policy + * reject - Reject enqueued messages + * flow-to-disk - Page messages to disk + * ring - Replace oldest unacquired message with new + * ring-strict - Replace oldest message, reject if oldest is acquired + * + * Queue Ordering Policies + * + * fifo (default) - First in, first out + * lvq - Last Value Queue ordering, allows queue browsing + * lvq-no-browse - Last Value Queue ordering, browsing clients may lose data + * + * Options: + * -h, --help show this help message and exit + * + * General Options: + * -t <secs>, --timeout=<secs> + * Maximum time to wait for broker connection (in + * seconds) + * -b, --bindings Show bindings in queue or exchange list + * -a <address>, --broker-addr=<address> + * Maximum time to wait for broker connection (in + * seconds) + * --sasl-mechanism=<mech> + * SASL mechanism for authentication (e.g. EXTERNAL, + * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL + * automatically picks the most secure available + * mechanism - use this option to override. + * + * Options for Adding Exchanges and Queues: + * --alternate-exchange=<aexname> + * Name of the alternate-exchange for the new queue or + * exchange. Exchanges route messages to the alternate + * exchange if they are unable to route them elsewhere. + * Queues route messages to the alternate exchange if + * they are rejected by a subscriber or orphaned by queue + * deletion. + * --passive, --dry-run + * Do not actually add the exchange or queue, ensure that + * all parameters and permissions are correct and would + * allow it to be created. + * --durable The new queue or exchange is durable. + * + * Options for Adding Queues: + * --file-count=<n> Number of files in queue's persistence journal + * --file-size=<n> File size in pages (64Kib/page) + * --max-queue-size=<n> + * Maximum in-memory queue size as bytes + * --max-queue-count=<n> + * Maximum in-memory queue size as a number of messages + * --limit-policy=<policy> + * Action to take when queue limit is reached + * --order=<ordering> Queue ordering policy + * --flow-stop-size=<n> + * Turn on sender flow control when the number of queued + * bytes exceeds this value. + * --flow-resume-size=<n> + * Turn off sender flow control when the number of queued + * bytes drops below this value. + * --flow-stop-count=<n> + * Turn on sender flow control when the number of queued + * messages exceeds this value. + * --flow-resume-count=<n> + * Turn off sender flow control when the number of queued + * messages drops below this value. + * --argument=<NAME=VALUE> + * Specify a key-value pair to add to queue arguments + * + * Options for Adding Exchanges: + * --sequence Exchange will insert a 'qpid.msg_sequence' field in + * the message header + * --ive Exchange will behave as an 'initial-value-exchange', + * keeping a reference to the last message forwarded and + * enqueuing that message to newly bound queues. + * + * Options for Deleting Queues: + * --force Force delete of queue even if it's currently used or + * it's not empty + * --force-if-not-empty + * Force delete of queue even if it's not empty + * --force-if-not-used + * Force delete of queue even if it's currently used + * + * Options for Declaring Bindings: + * -f <file.xq>, --file=<file.xq> + * For XML Exchange bindings - specifies the name of a + * file containing an XQuery. + * + * </pre> + * @author Fraser Adams + */ +public final class QpidConfig +{ + private static final String _usage = + "Usage: qpid-config [OPTIONS]\n" + + " qpid-config [OPTIONS] exchanges [filter-string]\n" + + " qpid-config [OPTIONS] queues [filter-string]\n" + + " qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]\n" + + " qpid-config [OPTIONS] del exchange <name>\n" + + " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]\n" + + " qpid-config [OPTIONS] del queue <name> [DelQueueOptions]\n" + + " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]\n" + + " <for type xml> [-f -|filename]\n" + + " <for type header> [all|any] k1=v1 [, k2=v2...]\n" + + " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]\n"; + + private static final String _description = + "ADDRESS syntax:\n" + + "\n" + + " [username/password@] hostname\n" + + " ip-address [:<port>]\n" + + "\n" + + "Examples:\n" + + "\n" + + "$ qpid-config add queue q\n" + + "$ qpid-config add exchange direct d localhost:5672\n" + + "$ qpid-config exchanges 10.1.1.7:10000\n" + + "$ qpid-config queues guest/guest@broker-host:10000\n" + + "\n" + + "Add Exchange <type> values:\n" + + "\n" + + " direct Direct exchange for point-to-point communication\n" + + " fanout Fanout exchange for broadcast communication\n" + + " topic Topic exchange that routes messages using binding keys with wildcards\n" + + " headers Headers exchange that matches header fields against the binding keys\n" + + " xml XML Exchange - allows content filtering using an XQuery\n" + + "\n" + + "\n" + + "Queue Limit Actions\n" + + "\n" + + " none (default) - Use broker's default policy\n" + + " reject - Reject enqueued messages\n" + + " flow-to-disk - Page messages to disk\n" + + " ring - Replace oldest unacquired message with new\n" + + " ring-strict - Replace oldest message, reject if oldest is acquired\n" + + "\n" + + "Queue Ordering Policies\n" + + "\n" + + " fifo (default) - First in, first out\n" + + " lvq - Last Value Queue ordering, allows queue browsing\n" + + " lvq-no-browse - Last Value Queue ordering, browsing clients may lose data\n"; + + private static final String _options = + "Options:\n" + + " -h, --help show this help message and exit\n" + + "\n" + + " General Options:\n" + + " -t <secs>, --timeout=<secs>\n" + + " Maximum time to wait for broker connection (in\n" + + " seconds)\n" + + " -b, --bindings Show bindings in queue or exchange list\n" + + " -a <address>, --broker-addr=<address>\n" + + " Maximum time to wait for broker connection (in\n" + + " seconds)\n" + + " --sasl-mechanism=<mech>\n" + + " SASL mechanism for authentication (e.g. EXTERNAL,\n" + + " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" + + " automatically picks the most secure available\n" + + " mechanism - use this option to override.\n" + + "\n" + + " Options for Adding Exchanges and Queues:\n" + + " --alternate-exchange=<aexname>\n" + + " Name of the alternate-exchange for the new queue or\n" + + " exchange. Exchanges route messages to the alternate\n" + + " exchange if they are unable to route them elsewhere.\n" + + " Queues route messages to the alternate exchange if\n" + + " they are rejected by a subscriber or orphaned by queue\n" + + " deletion.\n" + + " --passive, --dry-run\n" + + " Do not actually add the exchange or queue, ensure that\n" + + " all parameters and permissions are correct and would\n" + + " allow it to be created.\n" + + " --durable The new queue or exchange is durable.\n" + + "\n" + + " Options for Adding Queues:\n" + + " --file-count=<n> Number of files in queue's persistence journal\n" + + " --file-size=<n> File size in pages (64Kib/page)\n" + + " --max-queue-size=<n>\n" + + " Maximum in-memory queue size as bytes\n" + + " --max-queue-count=<n>\n" + + " Maximum in-memory queue size as a number of messages\n" + + " --limit-policy=<policy>\n" + + " Action to take when queue limit is reached\n" + + " --order=<ordering> Queue ordering policy\n" + + " --flow-stop-size=<n>\n" + + " Turn on sender flow control when the number of queued\n" + + " bytes exceeds this value.\n" + + " --flow-resume-size=<n>\n" + + " Turn off sender flow control when the number of queued\n" + + " bytes drops below this value.\n" + + " --flow-stop-count=<n>\n" + + " Turn on sender flow control when the number of queued\n" + + " messages exceeds this value.\n" + + " --flow-resume-count=<n>\n" + + " Turn off sender flow control when the number of queued\n" + + " messages drops below this value.\n" + + " --argument=<NAME=VALUE>\n" + + " Specify a key-value pair to add to queue arguments\n" + + "\n" + + " Options for Adding Exchanges:\n" + + " --sequence Exchange will insert a 'qpid.msg_sequence' field in\n" + + " the message header\n" + + " --ive Exchange will behave as an 'initial-value-exchange',\n" + + " keeping a reference to the last message forwarded and\n" + + " enqueuing that message to newly bound queues.\n" + + "\n" + + " Options for Deleting Queues:\n" + + " --force Force delete of queue even if it's currently used or\n" + + " it's not empty\n" + + " --force-if-not-empty\n" + + " Force delete of queue even if it's not empty\n" + + " --force-if-not-used\n" + + " Force delete of queue even if it's currently used\n" + + "\n" + + " Options for Declaring Bindings:\n" + + " -f <file.xq>, --file=<file.xq>\n" + + " For XML Exchange bindings - specifies the name of a\n" + + " file containing an XQuery.\n"; + + private Console _console; + private QmfConsoleData _broker; + + private boolean _recursive = false; + private String _host = "localhost"; + private int _connTimeout = 10; + private String _altExchange = null; + private boolean _passive = false; + private boolean _durable = false; + private boolean _ifEmpty = true; + private boolean _ifUnused = true; + private long _fileCount = 8; + private long _fileSize = 24; + private long _maxQueueSize = 0; + private long _maxQueueCount = 0; + private String _limitPolicy = "none"; + private String _order = "fifo"; + private boolean _msgSequence = false; + private boolean _ive = false; + private String _file = null; + + // New to Qpid 0.10 qpid-config + private String _saslMechanism = null; + private long _flowStopCount = 0; + private long _flowResumeCount = 0; + private long _flowStopSize = 0; + private long _flowResumeSize = 0; + private List<String> extraArguments = new ArrayList<String>(); + + private static final String FILECOUNT = "qpid.file_count"; + private static final String FILESIZE = "qpid.file_size"; + private static final String MAX_QUEUE_SIZE = "qpid.max_size"; + private static final String MAX_QUEUE_COUNT = "qpid.max_count"; + private static final String POLICY_TYPE = "qpid.policy_type"; + private static final String LVQ = "qpid.last_value_queue"; + private static final String LVQNB = "qpid.last_value_queue_no_browse"; + private static final String MSG_SEQUENCE = "qpid.msg_sequence"; + private static final String IVE = "qpid.ive"; + private static final String FLOW_STOP_COUNT = "qpid.flow_stop_count"; + private static final String FLOW_RESUME_COUNT = "qpid.flow_resume_count"; + private static final String FLOW_STOP_SIZE = "qpid.flow_stop_size"; + private static final String FLOW_RESUME_SIZE = "qpid.flow_resume_size"; + + // There are various arguments to declare that have specific program options in this utility. + // However there is now a generic mechanism for passing arguments as well. The SPECIAL_ARGS + // set contains the arguments for which there are specific program options defined i.e. the + // arguments for which there is special processing on add and list. + private static HashSet<String> SPECIAL_ARGS = new HashSet<String>(); + + static + { + SPECIAL_ARGS.add(FILECOUNT); + SPECIAL_ARGS.add(FILESIZE); + SPECIAL_ARGS.add(MAX_QUEUE_SIZE); + SPECIAL_ARGS.add(MAX_QUEUE_COUNT); + SPECIAL_ARGS.add(POLICY_TYPE); + SPECIAL_ARGS.add(LVQ); + SPECIAL_ARGS.add(LVQNB); + SPECIAL_ARGS.add(MSG_SEQUENCE); + SPECIAL_ARGS.add(IVE); + SPECIAL_ARGS.add(FLOW_STOP_COUNT); + SPECIAL_ARGS.add(FLOW_RESUME_COUNT); + SPECIAL_ARGS.add(FLOW_STOP_SIZE); + SPECIAL_ARGS.add(FLOW_RESUME_SIZE); + } + + /** + * Display long-form QpidConfig usage. + */ + private void usage() + { + System.out.println(_usage); + System.exit(1); + } + + /** + * Display QpidConfig options. + */ + private void options() + { + System.out.println(_usage); + System.out.println(_description); + System.out.println(_options); + System.exit(1); + } + + /** + * Finds a QmfConsoleData instance in a List of QMF Objects that matches a given ObjectID. + * + * More or less a direct Java port of findById from qpid-config. + * + * @param items the List of QMF Objects to search. + * @param id the ObjectId we're searching the List for. + * @return return the found object as a QmfConsoleData else return null. + */ + private QmfConsoleData findById(final List<QmfConsoleData> items, final ObjectId id) + { + for (QmfConsoleData item : items) + { + if (item.getObjectId().equals(id)) + { + return item; + } + } + + return null; + } + + /** + * Provide a basic overview of the number and type of queues and exchanges. + */ + private void overview() + { + List<QmfConsoleData> exchanges = _console.getObjects("org.apache.qpid.broker", "exchange"); + List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue"); + + System.out.printf("Total Exchanges: %d\n", exchanges.size()); + + Map<String, AtomicInteger> etype = new HashMap<String, AtomicInteger>(); + for (QmfConsoleData exchange : exchanges) + { + String exchangeType = exchange.getStringValue("type"); + AtomicInteger n = etype.get(exchangeType); + if (n == null) + { + etype.put(exchangeType, new AtomicInteger(1)); + } + else + { + n.getAndIncrement(); + } + } + + for (Map.Entry<String, AtomicInteger> entry : etype.entrySet()) + { + System.out.printf("%15s: %s\n", entry.getKey(), entry.getValue()); + } + + System.out.println(); + System.out.printf(" Total Queues: %d\n", queues.size()); + + int durable = 0; + for (QmfConsoleData queue : queues) + { + boolean isDurable = queue.getBooleanValue("durable"); + if (isDurable) + { + durable++; + } + } + + System.out.printf(" durable: %d\n", durable); + System.out.printf(" non-durable: %d\n", queues.size() - durable); + } + + /** + * For every exchange list detailed info (equivalent of qpid-config exchanges). + * + * More or less a direct Java port of ExchangeList in qpid-config, which handles qpid-config exchanges. + * + * @param filter specifies the exchange name to display info for, if set to "" displays info for every exchange. + */ + private void exchangeList(final String filter) + { + List<QmfConsoleData> exchanges = _console.getObjects("org.apache.qpid.broker", "exchange"); + + String caption1 = "Type "; + String caption2 = "Exchange Name"; + int maxNameLen = caption2.length(); + + for (QmfConsoleData exchange : exchanges) + { + String name = exchange.getStringValue("name"); + if (filter.equals("") || filter.equals(name)) + { + if (name.length() > maxNameLen) + { + maxNameLen = name.length(); + } + } + } + + System.out.printf("%s%-" + maxNameLen + "s Attributes\n", caption1, caption2); + + StringBuilder buf = new StringBuilder(); + for (int i = 0; i < (((maxNameLen + caption1.length()) / 5) + 5); i++) + { + buf.append("====="); + } + String line = buf.toString(); + System.out.println(line); + + for (QmfConsoleData exchange : exchanges) + { + String name = exchange.getStringValue("name"); + if (filter.equals("") || filter.equals(name)) + { + System.out.printf("%-10s%-" + maxNameLen + "s ", exchange.getStringValue("type"), name); + Map args = (Map)exchange.getValue("arguments"); + args = (args == null) ? Collections.EMPTY_MAP : args; + + if (exchange.getBooleanValue("durable")) + { + System.out.printf("--durable "); + } + + if (args.containsKey(MSG_SEQUENCE) && QmfData.getLong(args.get(MSG_SEQUENCE)) == 1) + { + System.out.printf("--sequence "); + } + + if (args.containsKey(IVE) && QmfData.getLong(args.get(IVE)) == 1) + { + System.out.printf("--ive "); + } + + if (exchange.hasValue("altExchange")) + { + ObjectId altExchangeRef = exchange.getRefValue("altExchange"); + QmfConsoleData altExchange = findById(exchanges, altExchangeRef); + if (altExchange != null) + { + System.out.printf("--alternate-exchange=%s", altExchange.getStringValue("name")); + } + } + + System.out.println(); + } + } + } + + /** + * For every exchange list the bindings (equivalent of qpid-config -b exchanges). + * + * More or less a direct Java port of ExchangeListRecurse in qpid-config, which handles qpid-config -b exchanges. + * + * @param filter specifies the exchange name to display info for, if set to "" displays info for every exchange. + */ + private void exchangeListRecurse(final String filter) + { + List<QmfConsoleData> exchanges = _console.getObjects("org.apache.qpid.broker", "exchange"); + List<QmfConsoleData> bindings = _console.getObjects("org.apache.qpid.broker", "binding"); + List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue"); + + for (QmfConsoleData exchange : exchanges) + { + ObjectId exchangeId = exchange.getObjectId(); + String name = exchange.getStringValue("name"); + + if (filter.equals("") || filter.equals(name)) + { + System.out.printf("Exchange '%s' (%s)\n", name, exchange.getStringValue("type")); + for (QmfConsoleData binding : bindings) + { + ObjectId exchangeRef = binding.getRefValue("exchangeRef"); + + if (exchangeRef.equals(exchangeId)) + { + ObjectId queueRef = binding.getRefValue("queueRef"); + QmfConsoleData queue = findById(queues, queueRef); + + String queueName = "<unknown>"; + if (queue != null) + { + queueName = queue.getStringValue("name"); + if (queueName.equals("")) + { + queueName = "''"; + } + } + + String bindingKey = binding.getStringValue("bindingKey"); + Map arguments = (Map)binding.getValue("arguments"); + if (arguments == null || arguments.isEmpty()) + { + System.out.printf(" bind [%s] => %s\n", bindingKey, queueName); + } + else + { + // If there are binding arguments then it's a headers exchange + System.out.printf(" bind [%s] => %s %s\n", bindingKey, queueName, arguments); + } + } + } + } + } + } + + /** + * For every queue list detailed info (equivalent of qpid-config queues). + * + * More or less a direct Java port of QueueList in qpid-config, which handles qpid-config queues. + * + * @param filter specifies the queue name to display info for, if set to "" displays info for every queue. + */ + private void queueList(final String filter) + { + List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue"); + + String caption = "Queue Name"; + int maxNameLen = caption.length(); + + for (QmfConsoleData queue : queues) + { + String name = queue.getStringValue("name"); + if (filter.equals("") || filter.equals(name)) + { + if (name.length() > maxNameLen) + { + maxNameLen = name.length(); + } + } + } + + System.out.printf("%-" + maxNameLen + "s Attributes\n", caption); + + StringBuilder buf = new StringBuilder(); + for (int i = 0; i < ((maxNameLen / 5) + 5); i++) + { + buf.append("====="); + } + String line = buf.toString(); + System.out.println(line); + + for (QmfConsoleData queue : queues) + { + String name = queue.getStringValue("name"); + if (filter.equals("") || filter.equals(name)) + { + System.out.printf("%-" + maxNameLen + "s ", name); + Map<String, Object> args = queue.<Map<String, Object>>getValue("arguments"); + args = (args == null) ? Collections.EMPTY_MAP : args; +/*System.out.println(args); +for (Map.Entry<String, Object> entry : args.entrySet()) { + System.out.println(entry.getKey() + " " + entry.getValue().getClass().getCanonicalName()); +}*/ + if (queue.getBooleanValue("durable")) + { + System.out.printf("--durable "); + } + + if (queue.getBooleanValue("autoDelete")) + { + System.out.printf("auto-del "); + } + + if (queue.getBooleanValue("exclusive")) + { + System.out.printf("excl "); + } + + if (args.containsKey(FILESIZE)) + { + System.out.printf("--file-size=%d ", QmfData.getLong(args.get(FILESIZE))); + } + + if (args.containsKey(FILECOUNT)) + { + System.out.printf("--file-count=%d ", QmfData.getLong(args.get(FILECOUNT))); + } + + if (args.containsKey(MAX_QUEUE_SIZE)) + { + System.out.printf("--max-queue-size=%d ", QmfData.getLong(args.get(MAX_QUEUE_SIZE))); + } + + if (args.containsKey(MAX_QUEUE_COUNT)) + { + System.out.printf("--max-queue-count=%d ", QmfData.getLong(args.get(MAX_QUEUE_COUNT))); + } + + if (args.containsKey(POLICY_TYPE)) + { + System.out.printf("--limit-policy=%s ", (QmfData.getString(args.get(POLICY_TYPE))).replace("_", "-")); + } + + if (args.containsKey(LVQ) && QmfData.getLong(args.get(LVQ)) == 1) + { + System.out.printf("--order lvq "); + } + + if (args.containsKey(LVQNB) && QmfData.getLong(args.get(LVQNB)) == 1) + { + System.out.printf("--order lvq-no-browse "); + } + + if (queue.hasValue("altExchange")) + { + ObjectId altExchangeRef = queue.getRefValue("altExchange"); + List<QmfConsoleData> altExchanges = _console.getObjects(altExchangeRef); + if (altExchanges.size() == 1) + { + QmfConsoleData altExchange = altExchanges.get(0); + System.out.printf("--alternate-exchange=%s", altExchange.getStringValue("name")); + } + } + + if (args.containsKey(FLOW_STOP_SIZE)) + { + System.out.printf("--flow-stop-size=%d ", QmfData.getLong(args.get(FLOW_STOP_SIZE))); + } + + if (args.containsKey(FLOW_RESUME_SIZE)) + { + System.out.printf("--flow-resume-size=%d ", QmfData.getLong(args.get(FLOW_RESUME_SIZE))); + } + + if (args.containsKey(FLOW_STOP_COUNT)) + { + System.out.printf("--flow-stop-count=%d ", QmfData.getLong(args.get(FLOW_STOP_COUNT))); + } + + if (args.containsKey(FLOW_RESUME_COUNT)) + { + System.out.printf("--flow-resume-count=%d ", QmfData.getLong(args.get(FLOW_RESUME_COUNT))); + } + + for (Map.Entry<String, Object> entry : args.entrySet()) + { // Display generic queue arguments + if (!SPECIAL_ARGS.contains(entry.getKey())) + { + System.out.printf("--argument %s=%s ", entry.getKey(), entry.getValue()); + } + } + + System.out.println(); + } + } + } + + /** + * For every queue list the bindings (equivalent of qpid-config -b queues). + * + * More or less a direct Java port of QueueListRecurse in qpid-config, which handles qpid-config -b queues. + * + * @param filter specifies the queue name to display info for, if set to "" displays info for every queue. + */ + private void queueListRecurse(final String filter) + { + List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue"); + List<QmfConsoleData> bindings = _console.getObjects("org.apache.qpid.broker", "binding"); + List<QmfConsoleData> exchanges = _console.getObjects("org.apache.qpid.broker", "exchange"); + + for (QmfConsoleData queue : queues) + { + ObjectId queueId = queue.getObjectId(); + String name = queue.getStringValue("name"); + + if (filter.equals("") || filter.equals(name)) + { + System.out.printf("Queue '%s'\n", name); + + for (QmfConsoleData binding : bindings) + { + ObjectId queueRef = binding.getRefValue("queueRef"); + + if (queueRef.equals(queueId)) + { + ObjectId exchangeRef = binding.getRefValue("exchangeRef"); + QmfConsoleData exchange = findById(exchanges, exchangeRef); + + String exchangeName = "<unknown>"; + if (exchange != null) + { + exchangeName = exchange.getStringValue("name"); + if (exchangeName.equals("")) + { + exchangeName = "''"; + } + } + + String bindingKey = binding.getStringValue("bindingKey"); + Map arguments = (Map)binding.getValue("arguments"); + if (arguments == null || arguments.isEmpty()) + { + System.out.printf(" bind [%s] => %s\n", bindingKey, exchangeName); + } + else + { + // If there are binding arguments then it's a headers exchange + System.out.printf(" bind [%s] => %s %s\n", bindingKey, exchangeName, arguments); + } + } + } + } + } + } + + /** + * Add an exchange using the QMF "create" method. + * @param args the exchange type is the first argument and the exchange name is the second argument. + * The remaining QMF method properties are populated form config parsed from the command line. + */ + private void addExchange(final String[] args) + { + if (args.length < 2) + { + usage(); + } + + Map<String, Object> properties = new HashMap<String, Object>(); + if (_durable) + { + properties.put("durable", true); + } + + properties.put("exchange-type", args[0]); + + if (_msgSequence) + { + properties.put(MSG_SEQUENCE, 1l); + } + + if (_ive) + { + properties.put(IVE, 1l); + } + + if (_altExchange != null) + { + properties.put("alternate-exchange", _altExchange); + } + + QmfData arguments = new QmfData(); + arguments.setValue("type", "exchange"); + arguments.setValue("name", args[1]); + arguments.setValue("properties", properties); + + try + { + _broker.invokeMethod("create", arguments); + } + catch (QmfException e) + { + System.out.println(e.getMessage()); + } + // passive exchange creation not implemented yet (not sure how to do it using QMF2) + } + + /** + * Add a queue using the QMF "create" method. + * @param args the queue name is the first argument. + * The remaining QMF method properties are populated form config parsed from the command line. + */ + private void addQueue(final String[] args) + { + if (args.length < 1) + { + usage(); + } + + Map<String, Object> properties = new HashMap<String, Object>(); + + for (String a : extraArguments) + { + String[] r = a.split("="); + String value = r.length == 2 ? r[1] : null; + properties.put(r[0], value); + } + + if (_durable) + { + properties.put("durable", true); + properties.put(FILECOUNT, _fileCount); + properties.put(FILESIZE, _fileSize); + } + + if (_maxQueueSize > 0) + { + properties.put(MAX_QUEUE_SIZE, _maxQueueSize); + } + + if (_maxQueueCount > 0) + { + properties.put(MAX_QUEUE_COUNT, _maxQueueCount); + } + + if (_limitPolicy.equals("reject")) + { + properties.put(POLICY_TYPE, "reject"); + } + else if (_limitPolicy.equals("flow-to-disk")) + { + properties.put(POLICY_TYPE, "flow_to_disk"); + } + else if (_limitPolicy.equals("ring")) + { + properties.put(POLICY_TYPE, "ring"); + } + else if (_limitPolicy.equals("ring-strict")) + { + properties.put(POLICY_TYPE, "ring_strict"); + } + + if (_order.equals("lvq")) + { + properties.put(LVQ, 1l); + } + else if (_order.equals("lvq-no-browse")) + { + properties.put(LVQNB, 1l); + } + + if (_altExchange != null) + { + properties.put("alternate-exchange", _altExchange); + } + + if (_flowStopSize > 0) + { + properties.put(FLOW_STOP_SIZE, _flowStopSize); + } + + if (_flowResumeSize > 0) + { + properties.put(FLOW_RESUME_SIZE, _flowResumeSize); + } + + if (_flowStopCount > 0) + { + properties.put(FLOW_STOP_COUNT, _flowStopCount); + } + + if (_flowResumeCount > 0) + { + properties.put(FLOW_RESUME_COUNT, _flowResumeCount); + } + + QmfData arguments = new QmfData(); + arguments.setValue("type", "queue"); + arguments.setValue("name", args[0]); + arguments.setValue("properties", properties); + + try + { + _broker.invokeMethod("create", arguments); + } + catch (QmfException e) + { + System.out.println(e.getMessage()); + } + // passive queue creation not implemented yet (not sure how to do it using QMF2) + } + + /** + * Remove an exchange using the QMF "delete" method. + * @param args the exchange name is the first argument. + * The remaining QMF method properties are populated form config parsed from the command line. + */ + private void delExchange(final String[] args) + { + if (args.length < 1) + { + usage(); + } + + QmfData arguments = new QmfData(); + arguments.setValue("type", "exchange"); + arguments.setValue("name", args[0]); + + try + { + _broker.invokeMethod("delete", arguments); + } + catch (QmfException e) + { + System.out.println(e.getMessage()); + } + } + + /** + * Remove a queue using the QMF "delete" method. + * @param args the queue name is the first argument. + * The remaining QMF method properties are populated form config parsed from the command line. + */ + private void delQueue(final String[] args) + { + if (args.length < 1) + { + usage(); + } + + if (_ifEmpty || _ifUnused) + { // Check the selected queue object to see if it is not empty or is in use + List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue"); + for (QmfConsoleData queue : queues) + { + ObjectId queueId = queue.getObjectId(); + String name = queue.getStringValue("name"); + if (name.equals(args[0])) + { + long msgDepth = queue.getLongValue("msgDepth"); + if (_ifEmpty == true && msgDepth > 0) + { + System.out.println("Cannot delete queue " + name + "; queue not empty"); + return; + } + + long consumerCount = queue.getLongValue("consumerCount"); + if (_ifUnused == true && consumerCount > 0) + { + System.out.println("Cannot delete queue " + name + "; queue in use"); + return; + } + } + } + } + + QmfData arguments = new QmfData(); + arguments.setValue("type", "queue"); + arguments.setValue("name", args[0]); + + try + { + _broker.invokeMethod("delete", arguments); + } + catch (QmfException e) + { + System.out.println(e.getMessage()); + } + } + + /** + * Add a binding using the QMF "create" method. + * @param args the exchange name is the first argument, the queue name is the second argument and the binding key + * is the third argument. + * The remaining QMF method properties are populated form config parsed from the command line. + */ + private void bind(final String[] args) + { + if (args.length < 2) + { + usage(); + } + + // Look up exchange objects to find the type of the selected exchange + String exchangeType = null; + List<QmfConsoleData> exchanges = _console.getObjects("org.apache.qpid.broker", "exchange"); + for (QmfConsoleData exchange : exchanges) + { + String name = exchange.getStringValue("name"); + if (args[0].equals(name)) + { + exchangeType = exchange.getStringValue("type"); + break; + } + } + + if (exchangeType == null) + { + System.out.println("Exchange " + args[0] + " is invalid"); + return; + } + + Map<String, Object> properties = new HashMap<String, Object>(); + if (exchangeType.equals("xml")) + { + if (_file == null) + { + System.out.println("Invalid args to bind xml: need an input file or stdin"); + return; + } + + String xquery = null; + if (_file.equals("-")) + { // Read xquery off stdin + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + try + { + StringBuilder buf = new StringBuilder(); + String line; + while ((line = in.readLine()) != null) // read until eof + { + buf.append(line + "\n"); + } + xquery = buf.toString(); + } + catch (IOException ioe) + { + System.out.println("Exception " + ioe + " while reading stdin"); + return; + } + } + else + { // Read xquery from input file + File file = new File(_file); + try + { + FileInputStream fin = new FileInputStream(file); + try + { + byte content[] = new byte[(int)file.length()]; + fin.read(content); + xquery = new String(content); + } + finally + { + fin.close(); + } + } + catch (FileNotFoundException e) + { + System.out.println("File " + _file + " not found"); + return; + } + catch (IOException ioe) + { + System.out.println("Exception " + ioe + " while reading " + _file); + return; + } + } + properties.put("xquery", xquery); + } + else if (exchangeType.equals("headers")) + { + if (args.length < 5) + { + System.out.println("Invalid args to bind headers: need 'any'/'all' plus conditions"); + return; + } + String op = args[3]; + if (op.equals("all") || op.equals("any")) + { + properties.put("x-match", op); + String[] bindings = Arrays.copyOfRange(args, 4, args.length); + for (String binding : bindings) + { + if (binding.contains("=")) + { + binding = binding.split(",")[0]; + String[] kv = binding.split("="); + properties.put(kv[0], kv[1]); + } + } + } + else + { + System.out.println("Invalid condition arg to bind headers, need 'any' or 'all', not '" + op + "'"); + return; + } + } + + String bindingIdentifier = args[0] + "/" + args[1]; + if (args.length > 2) + { + bindingIdentifier = bindingIdentifier + "/" + args[2]; + } + + QmfData arguments = new QmfData(); + arguments.setValue("type", "binding"); + arguments.setValue("name", bindingIdentifier); + arguments.setValue("properties", properties); + + try + { + _broker.invokeMethod("create", arguments); + } + catch (QmfException e) + { + System.out.println(e.getMessage()); + } + } + + /** + * Remove a binding using the QMF "delete" method. + * @param args the exchange name is the first argument, the queue name is the second argument and the binding key + * is the third argument. + * The remaining QMF method properties are populated form config parsed from the command line. + */ + private void unbind(final String[] args) + { + if (args.length < 2) + { + usage(); + } + + String bindingIdentifier = args[0] + "/" + args[1]; + if (args.length > 2) + { + bindingIdentifier = bindingIdentifier + "/" + args[2]; + } + + QmfData arguments = new QmfData(); + arguments.setValue("type", "binding"); + arguments.setValue("name", bindingIdentifier); + + try + { + _broker.invokeMethod("delete", arguments); + } + catch (QmfException e) + { + System.out.println(e.getMessage()); + } + } + + /** + * Create an instance of QpidConfig. + * + * @param args the command line arguments. + */ + public QpidConfig(final String[] args) + { + String[] longOpts = {"help", "durable", "bindings", "broker-addr=", "file-count=", + "file-size=", "max-queue-size=", "max-queue-count=", "limit-policy=", + "order=", "sequence", "ive", "force", "force-if-not-empty", + "force-if-used", "alternate-exchange=", "passive", "timeout=", "file=", "flow-stop-size=", + "flow-resume-size=", "flow-stop-count=", "flow-resume-count=", "argument="}; + + try + { + GetOpt getopt = new GetOpt(args, "ha:bf:", longOpts); + List<String[]> optList = getopt.getOptList(); + String[] cargs = {}; + cargs = getopt.getEncArgs().toArray(cargs); + + //System.out.println("optList"); + for (String[] opt : optList) + { + //System.out.println(opt[0] + ":" + opt[1]); + + if (opt[0].equals("-h") || opt[0].equals("--help")) + { + options(); + } + + if (opt[0].equals("-b") || opt[0].equals("--bindings")) + { + _recursive = true; + } + + if (opt[0].equals("-a") || opt[0].equals("--broker-addr")) + { + _host = opt[1]; + } + + if (opt[0].equals("-f") || opt[0].equals("--file")) + { + _file = opt[1]; + } + + if (opt[0].equals("--timeout")) + { + _connTimeout = Integer.parseInt(opt[1]); + } + + if (opt[0].equals("--alternate-exchange")) + { + _altExchange = opt[1]; + } + + if (opt[0].equals("--passive")) + { + _passive = true; + } + + if (opt[0].equals("--durable")) + { + _durable = true; + } + + if (opt[0].equals("--file-count")) + { + _fileCount = Long.parseLong(opt[1]); + } + + if (opt[0].equals("--file-size")) + { + _fileSize = Long.parseLong(opt[1]); + } + + if (opt[0].equals("--max-queue-size")) + { + _maxQueueSize = Long.parseLong(opt[1]); + } + + if (opt[0].equals("--max-queue-count")) + { + _maxQueueCount = Long.parseLong(opt[1]); + } + + if (opt[0].equals("--limit-policy")) + { + _limitPolicy = opt[1]; + } + + if (opt[0].equals("--flow-stop-size")) + { + _flowStopSize = Long.parseLong(opt[1]); + } + + if (opt[0].equals("--flow-resume-size")) + { + _flowResumeSize = Long.parseLong(opt[1]); + } + + if (opt[0].equals("--flow-stop-count")) + { + _flowStopCount = Long.parseLong(opt[1]); + } + + if (opt[0].equals("--flow-resume-count")) + { + _flowResumeCount = Long.parseLong(opt[1]); + } + + boolean validPolicy = false; + String[] validPolicies = {"none", "reject", "flow-to-disk", "ring", "ring-strict"}; + for (String i : validPolicies) + { + if (_limitPolicy.equals(i)) + { + validPolicy = true; + break; + } + } + + if (!validPolicy) + { + System.err.println("Error: Invalid --limit-policy argument"); + System.exit(1); + } + + if (opt[0].equals("--order")) + { + _order = opt[1]; + } + + boolean validOrder = false; + String[] validOrders = {"fifo", "lvq", "lvq-no-browse"}; + for (String i : validOrders) + { + if (_order.equals(i)) + { + validOrder = true; + break; + } + } + + if (!validOrder) + { + System.err.println("Error: Invalid --order argument"); + System.exit(1); + } + + if (opt[0].equals("--sequence")) + { + _msgSequence = true; + } + + if (opt[0].equals("--ive")) + { + _ive = true; + } + + if (opt[0].equals("--force")) + { + _ifEmpty = false; + _ifUnused = false; + } + + if (opt[0].equals("--force-if-not-empty")) + { + _ifEmpty = false; + } + + if (opt[0].equals("--force-if-used")) + { + _ifUnused = false; + } + + if (opt[0].equals("--argument")) + { + extraArguments.add(opt[1]); + } + } + + Connection connection = ConnectionHelper.createConnection(_host, "{reconnect: true}"); + _console = new Console(); + _console.disableEvents(); // Optimisation, as we're only doing getObjects() calls. + _console.addConnection(connection); + List<QmfConsoleData> brokers = _console.getObjects("org.apache.qpid.broker", "broker"); + if (brokers.isEmpty()) + { + System.out.println("No broker QmfConsoleData returned"); + System.exit(1); + } + + _broker = brokers.get(0); + + int nargs = cargs.length; + if (nargs == 0) + { + overview(); + } + else + { + String cmd = cargs[0]; + String modifier = ""; + + if (nargs > 1) + { + modifier = cargs[1]; + } + + if (cmd.equals("exchanges")) + { + if (_recursive) + { + exchangeListRecurse(modifier); + } + else + { + exchangeList(modifier); + } + } + else if (cmd.equals("queues")) + { + if (_recursive) + { + queueListRecurse(modifier); + } + else + { + queueList(modifier); + } + } + else if (cmd.equals("add")) + { + if (modifier.equals("exchange")) + { + addExchange(Arrays.copyOfRange(cargs, 2, cargs.length)); + } + else if (modifier.equals("queue")) + { + addQueue(Arrays.copyOfRange(cargs, 2, cargs.length)); + } + else + { + usage(); + } + } + else if (cmd.equals("del")) + { + if (modifier.equals("exchange")) + { + delExchange(Arrays.copyOfRange(cargs, 2, cargs.length)); + } + else if (modifier.equals("queue")) + { + delQueue(Arrays.copyOfRange(cargs, 2, cargs.length)); + } + else + { + usage(); + } + } + else if (cmd.equals("bind")) + { + bind(Arrays.copyOfRange(cargs, 1, cargs.length)); + } + else if (cmd.equals("unbind")) + { + unbind(Arrays.copyOfRange(cargs, 1, cargs.length)); + } + else + { + usage(); + } + } + } + catch (QmfException e) + { + System.err.println(e.toString()); + usage(); + } + catch (IllegalArgumentException e) + { + System.err.println(e.toString()); + usage(); + } + } + + /** + * Runs QpidConfig. + * @param args the command line arguments. + */ + public static void main(String[] args) + { + String logLevel = System.getProperty("amqj.logging.level"); + logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG. + System.setProperty("amqj.logging.level", logLevel); + + // As of Qpid 0.16 the Session Dispatcher Thread is non-Daemon so the JVM gets prevented from exiting. + // Setting the following property to true makes it a Daemon Thread. + System.setProperty("qpid.jms.daemon.dispatcher", "true"); + + QpidConfig qpidConfig = new QpidConfig(args); + } +} diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java new file mode 100644 index 0000000000..fa06b06519 --- /dev/null +++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java @@ -0,0 +1,358 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf2.tools; + +// JMS Imports +import javax.jms.Connection; + +// Misc Imports +import java.util.Arrays; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +// QMF2 Imports +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.QmfData; +import org.apache.qpid.qmf2.common.QmfException; +import org.apache.qpid.qmf2.console.Agent; +import org.apache.qpid.qmf2.console.Console; +import org.apache.qpid.qmf2.console.MethodResult; +import org.apache.qpid.qmf2.console.QmfConsoleData; + +import org.apache.qpid.qmf2.util.ConnectionHelper; +import org.apache.qpid.qmf2.util.GetOpt; + +// Reuse this class as it provides a handy mechanism to parse an args String into a Map +import org.apache.qpid.messaging.util.AddressParser; + +/** + * A tool to allow QMF2 methods to be invoked from the command line. + * <pre> + * Usage: QpidCtrl [options] command [args] + * The args need to be in a Stringified Map format (similar to an Address String) + * e.g. to set broker log level: QpidCtrl setLogLevel "{level:\"debug+:Broker\"}" + * The listValues command lists property names and values of the specified object. + * The listObjects command lists all objects of the specified package and class. + * + * Options: + * -h, --help show this help message and exit + * -v enable logging + * -a <address>, --broker-address=<address> + * broker-addr is in the form: [username/password@] + * hostname | ip-address [:<port>] ex: localhost, + * 10.1.1.7:10000, broker-host:10000, + * guest/guest@localhost + * -c <class>, --class=<class> + * class of object on which command is being invoked + * (default broker) + * -p <package>, --package=<package> + * package of object on which command is being invoked + * (default org.apache.qpid.broker) + * -i <id>, --id=<id> identifier of object on which command is being invoked + * (default amqp-broker) + * --agent=<agent name> + * The name of the Agent to which commands will be sent + * This will try to match <agent name> against the Agent name + * the Agent product name and will also check if the Agent name + * contains the <agent name> String + * (default qpidd) + * --sasl-mechanism=<mech> + * SASL mechanism for authentication (e.g. EXTERNAL, + * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL + * automatically picks the most secure available + * mechanism - use this option to override. + * </pre> + * Examples (Note the quotes and escaped quotes are significant!): + * <p> + * Get the current broker log level: + * <pre>QpidCtrl getLogLevel</pre> + * + * Set the current broker log level to notice+: + * <pre>QpidCtrl setLogLevel "{level:\"notice+\"}"</pre> + * + * Set the current broker log level to debug+ for all Management Objects: + * <pre>QpidCtrl setLogLevel "{level:\"debug+\"}"</pre> + * + * Set the current broker log level to debug+ for just the Broker Management Object: + * <pre>QpidCtrl setLogLevel "{level:\"debug+:Broker\"}"</pre> + * + * List the properties of the qmf.default.direct exchange: + * <pre>QpidCtrl -c exchange -i qmf.default.direct listValues</pre> + * + * Create a queue called test with a flow-to-disk limit policy: + * <pre>QpidCtrl create "{type:queue,name:test,properties:{'qpid.policy_type':ring}}"</pre> + * + * Delete a queue called test: + * <pre>QpidCtrl delete "{type:queue,name:test}"</pre> + * + * Create a binding called bind1 between the amq.match exchange and the test queue matching the headers name=fadams + * and gender=male: + * <pre>QpidCtrl create "{type:binding,name:'amq.match/test/bind1',properties:{x-match:all,name:fadams,gender:male}}"</pre> + * + * Delete the binding called bind1 between the amq.match exchange and the test queue: + * <pre>QpidCtrl delete "{type:binding,name:'amq.match/test/bind1'}"</pre> + * + * Get the broker to echo a message: + * <pre>QpidCtrl echo "{sequence:1234,body:'Peaches En Regalia'}"</pre> + * + * Invoke the event method on the gizmo Agent (launch gizmo Agent via AgentTest): + * <pre>QpidCtrl -p com.profitron.gizmo -c control -i OPERATIONAL --agent=gizmo event</pre> + * + * Invoke the create_child method on the gizmo Agent (launch gizmo Agent via AgentTest): + * <pre>QpidCtrl -p com.profitron.gizmo -c control -i OPERATIONAL --agent=gizmo create_child "{name:monkeyBoy}"</pre> + * + * Invoke the stop method on the gizmo Agent (launch gizmo Agent via AgentTest): + * <pre>QpidCtrl -p com.profitron.gizmo -c control -i OPERATIONAL --agent=gizmo stop "{message:'Will I dream?'}"</pre> + * + * @author Fraser Adams + */ +public final class QpidCtrl +{ + private static final String _usage = + "Usage: QpidCtrl [options] command [args]\n" + + "The args need to be in a Stringified Map format (similar to an Address String)\n" + + "e.g. to set broker log level: QpidCtrl setLogLevel \"{level:\\\"debug+:Broker\\\"}\"\n" + + "The listValues command lists property names and values of the specified object.\n" + + "The listObjects command lists all objects of the specified package and class.\n"; + + private static final String _options = + "Options:\n" + + " -h, --help show this help message and exit\n" + + " -v enable logging\n" + + " -a <address>, --broker-address=<address>\n" + + " broker-addr is in the form: [username/password@]\n" + + " hostname | ip-address [:<port>] ex: localhost,\n" + + " 10.1.1.7:10000, broker-host:10000,\n" + + " guest/guest@localhost\n" + + " -c <class>, --class=<class>\n" + + " class of object on which command is being invoked\n" + + " (default broker)\n" + + " -p <package>, --package=<package>\n" + + " package of object on which command is being invoked\n" + + " (default org.apache.qpid.broker)\n" + + " -i <id>, --id=<id> identifier of object on which command is being invoked\n" + + " (default amqp-broker)\n" + + " --agent=<agent name>\n" + + " The name of the Agent to which commands will be sent\n" + + " This will try to match <agent name> against the Agent name,\n" + + " the Agent product name and will also check if the Agent name\n" + + " contains the <agent name> String\n" + + " (default qpidd)\n" + + " --sasl-mechanism=<mech>\n" + + " SASL mechanism for authentication (e.g. EXTERNAL,\n" + + " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" + + " automatically picks the most secure available\n" + + " mechanism - use this option to override.\n"; + + private Console _console; + + /** + * Basic constructor. Creates JMS Session, Initialises Destinations, Producers & Consumers and starts connection. + * @param url the Connection URL. + * @param connectionOptions the connection options String to pass to ConnectionHelper. + * @param pkg the package name of the object we're invoking the method on. + * @param cls the class name of the object we're invoking the method on. + * @param id the ObjectId name of the object we're invoking the method on. + * @param agentName the name of the Agent to invoke the QMF method on. + * @param command the QMF method we're invoking. + * @param args the Stringified Map form of the method arguments. + */ + public QpidCtrl(final String url, final String connectionOptions, final String pkg, final String cls, + final String id, final String agentName, final String command, final String args) + { + try + { + Connection connection = ConnectionHelper.createConnection(url, connectionOptions); + _console = new Console(); + _console.addConnection(connection); + + // Find the specified Agent + Agent agent = _console.findAgent(agentName); + if (agent == null) + { + System.out.println("Agent " + agentName + " not found"); + System.exit(1); + } + + List<Agent> agentList = Arrays.asList(new Agent[] {agent}); + List<QmfConsoleData> objects = _console.getObjects(pkg, cls, agentList); + + // Parse the args String + QmfData inArgs = (args == null) ? new QmfData() : new QmfData(new AddressParser(args).map()); + + // Find the required QmfConsoleData object and invoke the specified command + MethodResult results = null; + for (QmfConsoleData object : objects) + { + String objectName = object.getObjectId().getObjectName(); + if (command.equals("listObjects")) + { + System.out.println(objectName); + } + else + { + if (objectName.contains(id)) + { // Use contains as ObjectNames may comprise other identifiers tha make using equals impractical + if (command.equals("listValues")) + { + object.listValues(); + System.exit(1); + } + else + { + results = object.invokeMethod(command, inArgs); + } + break; + } + } + } + + if (results == null) + { + if (objects.size() == 0) + { + System.out.println("getObjects(" + pkg + ", " + cls + ", " + agentName + ") returned no objects."); + } + else + { + System.out.println("Id " + id + " not found in " + pkg + ":" + cls); + } + } + else + { + if (results.succeeded()) + { + results.listValues(); + } + else + { + System.err.println ("QmfException " + results.getQmfException().getMessage() + + " returned from " + command + " method"); + } + } + } + catch (QmfException qmfe) + { + System.err.println ("QmfException " + qmfe.getMessage() + " caught in QpidCtrl constructor"); + } + } + + /** + * Runs QpidCtrl. + * @param args the command line arguments. + */ + public static void main(final String[] args) + { + String logLevel = System.getProperty("amqj.logging.level"); + logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG. + System.setProperty("amqj.logging.level", logLevel); + + // As of Qpid 0.16 the Session Dispatcher Thread is non-Daemon so the JVM gets prevented from exiting. + // Setting the following property to true makes it a Daemon Thread. + System.setProperty("qpid.jms.daemon.dispatcher", "true"); + + String[] longOpts = {"help", "broker-address=", "class=", "package=", "id=", "agent=", "sasl-mechanism="}; + try + { + String host = "localhost"; + String connectionOptions = "{reconnect: true}"; + String cls = "broker"; + String pkg = "org.apache.qpid.broker"; + String id = "amqp-broker"; + String agentName = "qpidd"; + String command = null; + String arg = null; + + GetOpt getopt = new GetOpt(args, "ha:c:p:i:v", longOpts); + List<String[]> optList = getopt.getOptList(); + String[] cargs = {}; + cargs = getopt.getEncArgs().toArray(cargs); + + for (String[] opt : optList) + { + if (opt[0].equals("-h") || opt[0].equals("--help")) + { + System.out.println(_usage); + System.out.println(_options); + System.exit(1); + } + else if (opt[0].equals("-a") || opt[0].equals("--broker-address")) + { + host = opt[1]; + } + else if (opt[0].equals("-c") || opt[0].equals("--class")) + { + cls = opt[1]; + } + else if (opt[0].equals("-p") || opt[0].equals("--package")) + { + pkg = opt[1]; + } + else if (opt[0].equals("-i") || opt[0].equals("--id")) + { + id = opt[1]; + } + else if (opt[0].equals("--agent")) + { + agentName = opt[1]; + } + else if (opt[0].equals("-v")) + { + System.setProperty("amqj.logging.level", "DEBUG"); + } + else if (opt[0].equals("--sasl-mechanism")) + { + connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}"; + } + } + + if (cargs.length < 1 || cargs.length > 2) + { + System.out.println(Arrays.asList(cargs)); + System.out.println(_usage); + System.exit(1); + } + + command = cargs[0]; + + if (cargs.length == 2) + { + arg = cargs[1]; + if (!arg.startsWith("{") || !arg.endsWith("}")) + { + System.out.println("Incorrect format for args."); + System.out.println("This needs to be in a Stringified Map format similar to an Address String"); + System.exit(1); + } + } + + QpidCtrl qpidCtrl = new QpidCtrl(host, connectionOptions, pkg, cls, id, agentName, command, arg); + } + catch (IllegalArgumentException e) + { + System.out.println(_usage); + System.out.println(e.getMessage()); + System.exit(1); + } + } +} diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java new file mode 100644 index 0000000000..34a38bf8d7 --- /dev/null +++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java @@ -0,0 +1,210 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf2.tools; + +// JMS Imports +import javax.jms.Connection; + +// Misc Imports +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.Map; + +// QMF2 Imports +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.QmfEventListener; +import org.apache.qpid.qmf2.common.QmfException; +import org.apache.qpid.qmf2.common.WorkItem; +import org.apache.qpid.qmf2.console.Agent; +import org.apache.qpid.qmf2.console.Console; +import org.apache.qpid.qmf2.console.EventReceivedWorkItem; +import org.apache.qpid.qmf2.console.QmfConsoleData; +import org.apache.qpid.qmf2.util.ConnectionHelper; +import org.apache.qpid.qmf2.util.GetOpt; +import static org.apache.qpid.qmf2.common.WorkItem.WorkItemType.*; + +/** + * Collect and print events from one or more Qpid message brokers. + * <pre> + * If no broker-addr is supplied, QpidPrintEvents connects to 'localhost:5672'. + * + * [broker-addr] syntax: + * + * [username/password@] hostname + * ip-address [:<port>] + * + * Examples: + * + * $ QpidPrintEvents localhost:5672 + * $ QpidPrintEvents 10.1.1.7:10000 + * $ QpidPrintEvents guest/guest@broker-host:10000 + * + * Options: + * -h, --help show this help message and exit + * --heartbeats Use heartbeats. + * --sasl-mechanism=<mech> + * SASL mechanism for authentication (e.g. EXTERNAL, + * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL + * automatically picks the most secure available + * mechanism - use this option to override. + * </pre> + * @author Fraser Adams + */ +public final class QpidPrintEvents implements QmfEventListener +{ + private static final String _usage = + "Usage: QpidPrintEvents [options] [broker-addr]...\n"; + + private static final String _description = + "Collect and print events from one or more Qpid message brokers.\n" + + "\n" + + "If no broker-addr is supplied, QpidPrintEvents connects to 'localhost:5672'.\n" + + "\n" + + "[broker-addr] syntax:\n" + + "\n" + + "[username/password@] hostname\n" + + "ip-address [:<port>]\n" + + "\n" + + "Examples:\n" + + "\n" + + "$ QpidPrintEvents localhost:5672\n" + + "$ QpidPrintEvents 10.1.1.7:10000\n" + + "$ QpidPrintEvents guest/guest@broker-host:10000\n"; + + private static final String _options = + "Options:\n" + + " -h, --help show this help message and exit\n" + + " --heartbeats Use heartbeats.\n" + + " --sasl-mechanism=<mech>\n" + + " SASL mechanism for authentication (e.g. EXTERNAL,\n" + + " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" + + " automatically picks the most secure available\n" + + " mechanism - use this option to override.\n"; + + private final String _url; + private Console _console; + + /** + * Basic constructor. Creates JMS Session, Initialises Destinations, Producers & Consumers and starts connection. + * @param url the connection URL. + * @param connectionOptions the options String to pass to ConnectionHelper. + */ + public QpidPrintEvents(final String url, final String connectionOptions) + { + System.out.println("Connecting to " + url); + _url = url; + try + { + Connection connection = ConnectionHelper.createConnection(url, connectionOptions); + _console = new Console(this); + _console.addConnection(connection); + } + catch (QmfException qmfe) + { + System.err.println ("QmfException " + qmfe.getMessage() + " caught in QpidPrintEvents constructor"); + } + } + + /** + * Checks if the WorkItem is an EventReceivedWorkItem and if it is extracts and renders the QmfEvent. + * @param wi a QMF2 WorkItem object + */ + public void onEvent(final WorkItem wi) + { + if (wi instanceof EventReceivedWorkItem) + { + EventReceivedWorkItem item = (EventReceivedWorkItem)wi; + QmfEvent event = item.getEvent(); + System.out.println(event + " broker=" + _url); + } + } + + /** + * Runs QpidPrintEvents. + * @param args the command line arguments. + */ + public static void main(final String[] args) + { + String logLevel = System.getProperty("amqj.logging.level"); + logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG. + System.setProperty("amqj.logging.level", logLevel); + + String[] longOpts = {"help", "heartbeats", "sasl-mechanism="}; + try + { + String connectionOptions = "{reconnect: true}"; + GetOpt getopt = new GetOpt(args, "h", longOpts); + List<String[]> optList = getopt.getOptList(); + String[] cargs = {}; + cargs = getopt.getEncArgs().toArray(cargs); + for (String[] opt : optList) + { + if (opt[0].equals("-h") || opt[0].equals("--help")) + { + System.out.println(_usage); + System.out.println(_description); + System.out.println(_options); + System.exit(1); + } + else if (opt[0].equals("--heartbeats")) + { + // Ignore Java uses heartbeats by default + } + else if (opt[0].equals("--sasl-mechanism")) + { + connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}"; + } + } + + int nargs = cargs.length; + if (nargs == 0) + { + cargs = new String[] {"localhost"}; + } + + for (String url : cargs) + { + QpidPrintEvents eventPrinter = new QpidPrintEvents(url, connectionOptions); + } + } + catch (IllegalArgumentException e) + { + System.out.println(_usage); + System.exit(1); + } + + BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in)); + try + { // Blocks here until return is pressed + System.out.println("Hit Return to exit"); + String s = commandLine.readLine(); + System.exit(0); + } + catch (IOException e) + { + System.out.println ("QpidPrintEvents main(): IOException: " + e.getMessage()); + } + } +} diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java new file mode 100644 index 0000000000..4ac434dc17 --- /dev/null +++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java @@ -0,0 +1,374 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf2.tools; + +// JMS Imports +import javax.jms.Connection; + +// Misc Imports +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +// QMF2 Imports +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.QmfEventListener; +import org.apache.qpid.qmf2.common.QmfException; +import org.apache.qpid.qmf2.common.QmfQuery; +import org.apache.qpid.qmf2.common.QmfQueryTarget; +import org.apache.qpid.qmf2.common.SchemaClassId; +import org.apache.qpid.qmf2.common.WorkItem; +import org.apache.qpid.qmf2.console.Agent; +import org.apache.qpid.qmf2.console.AgentHeartbeatWorkItem; +import org.apache.qpid.qmf2.console.AgentRestartedWorkItem; +import org.apache.qpid.qmf2.console.Console; +import org.apache.qpid.qmf2.console.QmfConsoleData; + +import org.apache.qpid.qmf2.console.SubscribeIndication; +import org.apache.qpid.qmf2.console.SubscribeParams; +import org.apache.qpid.qmf2.console.SubscriptionIndicationWorkItem; + +import org.apache.qpid.qmf2.util.ConnectionHelper; +import org.apache.qpid.qmf2.util.GetOpt; + +/** + * Collect and print queue statistics. + * <pre> + * Usage: QpidQueueStats [options] + * + * Options: + * -h, --help show this help message and exit + * -a <address>, --broker-address=<address> + * broker-addr is in the form: [username/password@] + * hostname | ip-address [:<port>] ex: localhost, + * 10.1.1.7:10000, broker-host:10000, + * guest/guest@localhost + * -f <filter>, --filter=<filter> + * a list of comma separated queue names (regex are + * accepted) to show + * --sasl-mechanism=<mech> + * SASL mechanism for authentication (e.g. EXTERNAL, + * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL + * automatically picks the most secure available + * mechanism - use this option to override. + * </pre> + * @author Fraser Adams + */ +public final class QpidQueueStats implements QmfEventListener +{ + private final class Stats + { + private final String _name; + private QmfConsoleData _data; + + public Stats(final String name, final QmfConsoleData data) + { + _name = name; + _data = data; + } + + public String getName() + { + return _name; + } + + public QmfConsoleData getData() + { + return _data; + } + + public void setData(final QmfConsoleData data) + { + _data = data; + } + } + + private static final String _usage = + "Usage: QpidQueueStats [options]\n"; + + private static final String _options = + "Options:\n" + + " -h, --help show this help message and exit\n" + + " -a <address>, --broker-address=<address>\n" + + " broker-addr is in the form: [username/password@]\n" + + " hostname | ip-address [:<port>] ex: localhost,\n" + + " 10.1.1.7:10000, broker-host:10000,\n" + + " guest/guest@localhost\n" + + " -f <filter>, --filter=<filter>\n" + + " a list of comma separated queue names (regex are\n" + + " accepted) to show\n" + + " --sasl-mechanism=<mech>\n" + + " SASL mechanism for authentication (e.g. EXTERNAL,\n" + + " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" + + " automatically picks the most secure available\n" + + " mechanism - use this option to override.\n"; + + private final String _url; + private final List<Pattern> _filter; + private Agent _broker; + private Console _console; + private Map<ObjectId, Stats> _objects = new HashMap<ObjectId, Stats>(); + private String _subscriptionId = null; + private long _subscriptionDuration; + private long _startTime; + + /** + * Basic constructor. Creates JMS Session, Initialises Destinations, Producers & Consumers and starts connection. + * @param url the connection URL. + * @param connectionOptions the options String to pass to ConnectionHelper. + * @param filter a list of regex Patterns used to choose the queues we wish to display. + */ + public QpidQueueStats(final String url, final String connectionOptions, final List<Pattern> filter) + { + System.out.println("Connecting to " + url); + if (filter.size() > 0) + { + System.out.println("Filter = " + filter); + } + _url = url; + _filter = filter; + try + { + Connection connection = ConnectionHelper.createConnection(url, connectionOptions); + _console = new Console(this); + _console.addConnection(connection); + + // Wait until the broker Agent has been discovered + _broker = _console.findAgent("broker"); + if (_broker != null) + { + createQueueSubscription(); + } + + System.out.println("Hit Return to exit"); + System.out.println( + "Queue Name Sec Depth Enq Rate Deq Rate"); + System.out.println( + "============================================================================================="); + } + catch (QmfException qmfe) + { + System.err.println ("QmfException " + qmfe.getMessage() + " caught in QpidQueueStats constructor"); + } + } + + /** + * Create a Subscription to query for all queue objects + */ + private void createQueueSubscription() + { + try + { // This QmfQuery simply does an ID query for objects with the className "queue" + QmfQuery query = new QmfQuery(QmfQueryTarget.OBJECT, new SchemaClassId("queue")); + SubscribeParams params = _console.createSubscription(_broker, query, "queueStatsHandle"); + _subscriptionId = params.getSubscriptionId(); + _subscriptionDuration = params.getLifetime() - 10; // Subtract 10 as we want to refresh before it times out + _startTime = System.currentTimeMillis(); + } + catch (QmfException qmfe) + { + } + } + + /** + * Main Event handler. Checks if the WorkItem is a SubscriptionIndicationWorkItem, if it is it stores the object + * in a Map and uses this to maintain state so we can record deltas such as enqueue and dequeue rates. + * <p> + * The AgentHeartbeatWorkItem is used to periodically compare the elapsed time against the Subscription duration + * so that we can refresh the Subscription (or create a new one if necessary) in order to continue receiving + * queue Management Object data from the broker. + * <p> + * When the AgentRestartedWorkItem is received we clear the state to remove any stale queue Management Objects. + * @param wi a QMF2 WorkItem object + */ + public void onEvent(final WorkItem wi) + { + if (wi instanceof AgentHeartbeatWorkItem && _subscriptionId != null) + { + long elapsed = (long)Math.round((System.currentTimeMillis() - _startTime)/1000.0f); + if (elapsed > _subscriptionDuration) + { + try + { + _console.refreshSubscription(_subscriptionId); + _startTime = System.currentTimeMillis(); + } + catch (QmfException qmfe) + { + System.err.println ("QmfException " + qmfe.getMessage() + " caught in QpidQueueStats onEvent"); + createQueueSubscription(); + } + } + } + else if (wi instanceof AgentRestartedWorkItem) + { + _objects.clear(); + } + else if (wi instanceof SubscriptionIndicationWorkItem) + { + SubscriptionIndicationWorkItem item = (SubscriptionIndicationWorkItem)wi; + SubscribeIndication indication = item.getSubscribeIndication(); + String correlationId = indication.getConsoleHandle(); + if (correlationId.equals("queueStatsHandle")) + { // If it is (and it should be!!) then it's our queue object Subscription + List<QmfConsoleData> data = indication.getData(); + for (QmfConsoleData record : data) + { + ObjectId id = record.getObjectId(); + if (record.isDeleted()) + { // If the object was deleted by the Agent we remove it from out Map + _objects.remove(id); + } + else + { + if (_objects.containsKey(id)) + { // If the object is already in the Map it's likely to be a statistics push from the broker. + Stats stats = _objects.get(id); + String name = stats.getName(); + + boolean matches = false; + for (Pattern x : _filter) + { // Check the queue name against the regexes in the filter List (if any) + Matcher m = x.matcher(name); + if (m.find()) + { + matches = true; + break; + } + } + + if (_filter.isEmpty() || matches) + { // If there's no filter enabled or the filter matches the queue name we display statistics. + QmfConsoleData lastSample = stats.getData(); + stats.setData(record); + + float deltaTime = record.getUpdateTime() - lastSample.getUpdateTime(); + if (deltaTime > 1000000000.0f) + { + float deltaEnqueues = record.getLongValue("msgTotalEnqueues") - + lastSample.getLongValue("msgTotalEnqueues"); + float deltaDequeues = record.getLongValue("msgTotalDequeues") - + lastSample.getLongValue("msgTotalDequeues"); + long msgDepth = record.getLongValue("msgDepth"); + float enqueueRate = deltaEnqueues/(deltaTime/1000000000.0f); + float dequeueRate = deltaDequeues/(deltaTime/1000000000.0f); + + System.out.printf("%-46s%10.2f%11d%13.2f%13.2f\n", + name, deltaTime/1000000000, msgDepth, enqueueRate, dequeueRate); + } + } + } + else + { // If the object isn't in the Map it's likely to be a properties push from the broker. + if (!record.hasValue("name")) + { // This probably won't happen, but if it does we refresh the object to get its full state. + try + { + record.refresh(); + } + catch (QmfException qmfe) + { + } + } + String queueName = record.getStringValue("name"); + _objects.put(id, new Stats(queueName, record)); + } + } + } + } + } + } + + /** + * Runs QpidQueueStats. + * @param args the command line arguments. + */ + public static void main(final String[] args) + { + String logLevel = System.getProperty("amqj.logging.level"); + logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG. + System.setProperty("amqj.logging.level", logLevel); + + String[] longOpts = {"help", "broker-address=", "filter=", "sasl-mechanism="}; + try + { + String host = "localhost"; + String connectionOptions = "{reconnect: true}"; + List<Pattern> filter = new ArrayList<Pattern>(); + GetOpt getopt = new GetOpt(args, "ha:f:", longOpts); + List<String[]> optList = getopt.getOptList(); + + for (String[] opt : optList) + { + if (opt[0].equals("-h") || opt[0].equals("--help")) + { + System.out.println(_usage); + System.out.println(_options); + System.exit(1); + } + else if (opt[0].equals("-a") || opt[0].equals("--broker-address")) + { + host = opt[1]; + } + else if (opt[0].equals("-f") || opt[0].equals("--filter")) + { + String[] split = opt[1].split(","); + for (String s : split) + { + Pattern p = Pattern.compile(s); + filter.add(p); + } + } + else if (opt[0].equals("--sasl-mechanism")) + { + connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}"; + } + } + + QpidQueueStats queueStats = new QpidQueueStats(host, connectionOptions, filter); + } + catch (IllegalArgumentException e) + { + System.out.println(_usage); + System.out.println(e.getMessage()); + System.exit(1); + } + + BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in)); + try + { // Blocks here until return is pressed + String s = commandLine.readLine(); + System.exit(0); + } + catch (IOException e) + { + System.out.println ("QpidQueueStats main(): IOException: " + e.getMessage()); + } + } +} diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java new file mode 100644 index 0000000000..31368ed242 --- /dev/null +++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java @@ -0,0 +1,364 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf2.tools; + +// JMS Imports +import javax.jms.Connection; + +// Misc Imports +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +// QMF2 Imports +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.QmfData; +import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.QmfEventListener; +import org.apache.qpid.qmf2.common.QmfException; +import org.apache.qpid.qmf2.common.SchemaClassId; +import org.apache.qpid.qmf2.common.WorkItem; +import org.apache.qpid.qmf2.console.Agent; +import org.apache.qpid.qmf2.console.Console; +import org.apache.qpid.qmf2.console.EventReceivedWorkItem; +import org.apache.qpid.qmf2.console.QmfConsoleData; + +import org.apache.qpid.qmf2.util.ConnectionHelper; +import org.apache.qpid.qmf2.util.GetOpt; + +/** + * QueueFuse provides protection to message producers from consumers who can't consume messages fast enough. + * <p> + * With the default "reject" limit policy when a queue exceeds its capacity an exception is thrown to the + * producer. This behaviour is unfortunate, because if there happen to be multiple consumers consuming + * messages from a given producer it is possible for a single slow consumer to cause message flow to be + * stopped to <u>all</u> consumers, in other words a de-facto denial of service may take place. + * <p> + * In an Enterprise environment it is likely that this sort of behaviour is unwelcome, so QueueFuse makes it + * possible for queueThresholdExceeded Events to be detected and for the offending queues to have messages + * purged, thus protecting the other consumers by preventing an exception being thrown to the message producer. + * <p> + * The original intention with this class was to unbind bindings to queues that exceed the threshold. This method + * works, but it has a number of disadvantages. In particular there is no way to unbind from (and thus protect) + * queues bound to the default direct exchange, in addition in order to unbind it is necessary to retrieve + * binding and exchange information, both of which require further exchanges with the broker (which is not + * desirable as when the queueThresholdExceeded occurs we need to act pretty quickly). Finally as it happens + * it is also necessary to purge some messages after unbinding anyway as if this is not done the queue remains + * in the flowStopped state and producers will eventually time out and throw an exception if this is not cleared. + * So all in all simply purging each time we cross the threshold is simpler and has the additional advantage that + * if and when the consumer speeds up message delivery will eventually return to normal. + * + * <pre> + * Usage: QueueFuse [options] [broker-addr]... + * + * Monitors one or more Qpid message brokers for queueThresholdExceeded Events. + * + * If a queueThresholdExceeded Event occurs messages are purged from the queue, + * in other words this class behaves rather like a fuse 'blowing' if the + * threshold gets exceeded. + * + * If no broker-addr is supplied, QueueFuse connects to 'localhost:5672'. + * + * [broker-addr] syntax: + * + * [username/password@] hostname + * ip-address [:<port>] + * + * Examples: + * + * $ QueueFuse localhost:5672 + * $ QueueFuse 10.1.1.7:10000 + * $ QueueFuse guest/guest@broker-host:10000 + * + * Options: + * -h, --help show this help message and exit + * -f <filter>, --filter=<filter> + * a list of comma separated queue names (regex are + * accepted) to protect (default is to protect all). + * -p <PERCENT>, --purge=<PERCENT>\n" + + * The percentage of messages to purge when the queue\n" + + * threshold gets exceeded (default = 20%).\n" + + * N.B. if this gets set too low the fuse may not blow.\n" + + * --sasl-mechanism=<mech> + * SASL mechanism for authentication (e.g. EXTERNAL, + * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL + * automatically picks the most secure available + * mechanism - use this option to override. + * </pre> + * @author Fraser Adams + */ +public final class QueueFuse implements QmfEventListener +{ + private static final String _usage = + "Usage: QueueFuse [options] [broker-addr]...\n"; + + private static final String _description = + "Monitors one or more Qpid message brokers for queueThresholdExceeded Events.\n" + + "\n" + + "If a queueThresholdExceeded Event occurs messages are purged from the queue,\n" + + "in other words this class behaves rather like a fuse 'blowing' if the\n" + + "threshold gets exceeded.\n" + + "\n" + + "If no broker-addr is supplied, QueueFuse connects to 'localhost:5672'.\n" + + "\n" + + "[broker-addr] syntax:\n" + + "\n" + + "[username/password@] hostname\n" + + "ip-address [:<port>]\n" + + "\n" + + "Examples:\n" + + "\n" + + "$ QueueFuse localhost:5672\n" + + "$ QueueFuse 10.1.1.7:10000\n" + + "$ QueueFuse guest/guest@broker-host:10000\n"; + + private static final String _options = + "Options:\n" + + " -h, --help show this help message and exit\n" + + " -f <filter>, --filter=<filter>\n" + + " a list of comma separated queue names (regex are\n" + + " accepted) to protect (default is to protect all).\n" + + " -p <PERCENT>, --purge=<PERCENT>\n" + + " The percentage of messages to purge when the queue\n" + + " threshold gets exceeded (default = 20%).\n" + + " N.B. if this gets set too low the fuse may not blow.\n" + + " --sasl-mechanism=<mech>\n" + + " SASL mechanism for authentication (e.g. EXTERNAL,\n" + + " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" + + " automatically picks the most secure available\n" + + " mechanism - use this option to override.\n"; + + private final String _url; + private final List<Pattern> _filter; + private final float _purge; + private Map<String, QmfConsoleData> _queueCache = new HashMap<String, QmfConsoleData>(50); + private Console _console; + + /** + * Basic constructor. Creates JMS Session, Initialises Destinations, Producers & Consumers and starts connection. + * @param url the connection URL. + * @param connectionOptions the options String to pass to ConnectionHelper. + * @param filter a list of regex Patterns used to choose the queues we wish to protect. + * @param purge the ratio of messages that we wish to purge if the threshold gets exceeded. + */ + public QueueFuse(final String url, final String connectionOptions, final List<Pattern> filter, final float purge) + { + System.out.println("QueueFuse Connecting to " + url); + if (filter.size() > 0) + { + System.out.println("Filter = " + filter); + } + _url = url; + _filter = filter; + _purge = purge; + try + { + Connection connection = ConnectionHelper.createConnection(url, connectionOptions); + _console = new Console(this); + _console.addConnection(connection); + updateQueueCache(); + } + catch (QmfException qmfe) + { + System.err.println("QmfException " + qmfe.getMessage() + " caught in QueueFuse constructor"); + } + } + + /** + * Looks up queue objects and stores them in _queueCache keyed by the queue name + */ + private void updateQueueCache() + { + _queueCache.clear(); + List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue"); + for (QmfConsoleData queue : queues) + { + String queueName = queue.getStringValue("name"); + _queueCache.put(queueName, queue); + } + } + + /** + * Look up a queue object with the given name and if it's not a ring queue invoke the queue's purge method. + * @param queueName the name of the queue to purge + * @param msgDepth the number of messages on the queue, used to determine how many messages to purge. + */ + private void purgeQueue(final String queueName, long msgDepth) + { + QmfConsoleData queue = _queueCache.get(queueName); + + if (queue == null) + { + System.out.printf("%s ERROR QueueFuse.disconnectQueue() %s reference couldn't be found\n", + new Date().toString(), queueName); + } + else + { // If we've found a queue called queueName we then find the bindings that reference it. + + Map args = (Map)queue.getValue("arguments"); + String policyType = (String)args.get("qpid.policy_type"); + if (policyType != null && policyType.equals("ring")) + { // If qpid.policy_type=ring we return. + return; + } + + try + { + QmfData arguments = new QmfData(); + arguments.setValue("request", (long)(_purge*msgDepth)); + queue.invokeMethod("purge", arguments); + } + catch (QmfException e) + { + System.out.println(e.getMessage()); + } + } + } + + /** + * Main Event handler. + * @param wi a QMF2 WorkItem object + */ + public void onEvent(final WorkItem wi) + { + if (wi instanceof EventReceivedWorkItem) + { + EventReceivedWorkItem item = (EventReceivedWorkItem)wi; + Agent agent = item.getAgent(); + QmfEvent event = item.getEvent(); + String className = event.getSchemaClassId().getClassName(); + + if (className.equals("queueDeclare")) + { + updateQueueCache(); + } + else if (className.equals("queueThresholdExceeded")) + { + String queueName = event.getStringValue("qName"); + boolean matches = false; + for (Pattern x : _filter) + { // Check the queue name against the regexes in the filter List (if any) + Matcher m = x.matcher(queueName); + if (m.find()) + { + matches = true; + break; + } + } + + if (_filter.isEmpty() || matches) + { // If there's no filter enabled or the filter matches the queue name we call purgeQueue(). + long msgDepth = event.getLongValue("msgDepth"); + purgeQueue(queueName, msgDepth); + } + } + } + } + + /** + * Runs QueueFuse. + * @param args the command line arguments. + */ + public static void main(final String[] args) + { + String logLevel = System.getProperty("amqj.logging.level"); + logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG. + System.setProperty("amqj.logging.level", logLevel); + + String[] longOpts = {"help", "filter=", "purge=", "sasl-mechanism="}; + try + { + boolean includeRingQueues = false; + String connectionOptions = "{reconnect: true}"; + List<Pattern> filter = new ArrayList<Pattern>(); + float purge = 0.2f; + GetOpt getopt = new GetOpt(args, "hf:p:", longOpts); + List<String[]> optList = getopt.getOptList(); + String[] cargs = {}; + cargs = getopt.getEncArgs().toArray(cargs); + for (String[] opt : optList) + { + if (opt[0].equals("-h") || opt[0].equals("--help")) + { + System.out.println(_usage); + System.out.println(_description); + System.out.println(_options); + System.exit(1); + } + else if (opt[0].equals("-f") || opt[0].equals("--filter")) + { + String[] split = opt[1].split(","); + for (String s : split) + { + Pattern p = Pattern.compile(s); + filter.add(p); + } + } + else if (opt[0].equals("-p") || opt[0].equals("--purge")) + { + int percent = Integer.parseInt(opt[1]); + if (percent < 0 || percent > 100) + { + System.out.println(_usage); + System.exit(1); + } + purge = percent/100.0f; + } + else if (opt[0].equals("--sasl-mechanism")) + { + connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}"; + } + } + + int nargs = cargs.length; + if (nargs == 0) + { + cargs = new String[] {"localhost"}; + } + + for (String url : cargs) + { + QueueFuse queueFuse = new QueueFuse(url, connectionOptions, filter, purge); + } + } + catch (IllegalArgumentException e) + { + System.out.println(_usage); + System.out.println(e.getMessage()); + System.exit(1); + } + + try + { // Block here + Thread.currentThread().join(); + } + catch (InterruptedException ie) + { + } + } +} |