summaryrefslogtreecommitdiff
path: root/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools')
-rw-r--r--qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionAudit.java474
-rw-r--r--qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionLogger.java383
-rw-r--r--qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java1517
-rw-r--r--qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java358
-rw-r--r--qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java210
-rw-r--r--qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java374
-rw-r--r--qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java364
7 files changed, 3680 insertions, 0 deletions
diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionAudit.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionAudit.java
new file mode 100644
index 0000000000..7f6109cb7a
--- /dev/null
+++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionAudit.java
@@ -0,0 +1,474 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+// For DOM parsing the whitelist
+import org.w3c.dom.*;
+import javax.xml.parsers.*;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfEvent;
+import org.apache.qpid.qmf2.common.QmfEventListener;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.common.WorkItem;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.AgentRestartedWorkItem;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.EventReceivedWorkItem;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+
+/**
+ * Audits connections to one or more Qpid message brokers.
+ * <pre>
+ * Exchange and Queue names are checked against a whitelist and if no match is found an alert is generated.
+ *
+ * If no broker-addr is supplied, ConnectionAudit connects to 'localhost:5672'.
+ *
+ * [broker-addr] syntax:
+ *
+ * [username/password@] hostname
+ * ip-address [:&lt;port&gt;]
+ *
+ * Examples:
+ *
+ * $ ConnectionAudit localhost:5672
+ * $ ConnectionAudit 10.1.1.7:10000
+ * $ ConnectionAudit guest/guest@broker-host:10000
+ *
+ * Options:
+ * -h, --help show this help message and exit
+ * --sasl-mechanism=&lt;mech&gt;
+ * SASL mechanism for authentication (e.g. EXTERNAL,
+ * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL
+ * automatically picks the most secure available
+ * mechanism - use this option to override.
+ * --whitelist=&lt;whitelist XML document&gt;
+ * The fully qualified name of the whitelist XML file,
+ * default is ./whitelist.xml
+ *
+ * </pre>
+ * An example whitelist is illustrated below, note that in this example the exchanges associated with management
+ * have been whitelisted to remove spurious alerts caused by the temporary management queues.
+ * <pre>
+ *&lt;?xml version="1.0" encoding="UTF-8"?&gt;
+ *&lt;whitelist&gt;
+ * &lt;exchangeWhitelist&gt;
+ * &lt;exchange&gt;qmf.default.topic&lt;/exchange&gt;
+ * &lt;exchange&gt;qmf.default.direct&lt;/exchange&gt;
+ * &lt;exchange&gt;qpid.management&lt;/exchange&gt;
+ * &lt;exchange&gt;amq.direct&lt;/exchange&gt;
+ * &lt;exchange&gt;&lt;/exchange&gt;
+ * &lt;/exchangeWhitelist&gt;
+ * &lt;queueWhitelist&gt;
+ * &lt;queue&gt;testqueue&lt;/queue&gt;
+ * &lt;/queueWhitelist&gt;
+ *&lt;/whitelist&gt;
+ * </pre>
+
+ * @author Fraser Adams
+ */
+public final class ConnectionAudit implements QmfEventListener
+{
+ private static final String _usage =
+ "Usage: ConnectionAudit [options] [broker-addr]...\n";
+
+ private static final String _description =
+ "Audits connections to one or more Qpid message brokers.\n" +
+ "Exchange and Queue names are checked against a whitelist and if no match is found an alert is generated.\n" +
+ "\n" +
+ "If no broker-addr is supplied, ConnectionAudit connects to 'localhost:5672'.\n" +
+ "\n" +
+ "[broker-addr] syntax:\n" +
+ "\n" +
+ "[username/password@] hostname\n" +
+ "ip-address [:<port>]\n" +
+ "\n" +
+ "Examples:\n" +
+ "\n" +
+ "$ ConnectionAudit localhost:5672\n" +
+ "$ ConnectionAudit 10.1.1.7:10000\n" +
+ "$ ConnectionAudit guest/guest@broker-host:10000\n";
+
+ private static final String _options =
+ "Options:\n" +
+ " -h, --help show this help message and exit\n" +
+ " --sasl-mechanism=<mech>\n" +
+ " SASL mechanism for authentication (e.g. EXTERNAL,\n" +
+ " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" +
+ " automatically picks the most secure available\n" +
+ " mechanism - use this option to override.\n" +
+ " --whitelist=<whitelist XML document>\n" +
+ " The fully qualified name of the whitelist XML file,\n" +
+ " default is ./whitelist.xml\n";
+
+
+ private final String _url;
+ private final String _whitelist;
+ private long _whitelistLastModified = 0;
+ private Console _console;
+
+ // The sets to be used as the whitelists.
+ private Set<String> _exchangeWhitelist = new HashSet<String>();
+ private Set<String> _queueWhitelist = new HashSet<String>();
+
+ /**
+ * Basic constructor. Creates JMS Session, Initialises Destinations, Producers &amp; Consumers and starts connection.
+ * @param url the connection URL.
+ * @param connectionOptions the options String to pass to ConnectionHelper.
+ * @param whitelist the path name of the whitelist XML file.
+ */
+ public ConnectionAudit(final String url, final String connectionOptions, final String whitelist)
+ {
+ System.out.println("Connecting to " + url);
+ _url = url;
+ _whitelist = whitelist;
+ try
+ {
+ Connection connection = ConnectionHelper.createConnection(url, connectionOptions);
+ _console = new Console(this);
+ _console.addConnection(connection);
+ checkExistingSubscriptions();
+ }
+ catch (QmfException qmfe)
+ {
+ System.err.println ("QmfException " + qmfe.getMessage() + " caught in ConnectionAudit constructor");
+ }
+ }
+
+ /**
+ * When we start up we need to check any subscriptions that already exist against the whitelist.
+ * Subsequent checks are made only when we receive new subscribe events.
+ */
+ private void checkExistingSubscriptions()
+ {
+ readWhitelist();
+ List<QmfConsoleData> subscriptions = _console.getObjects("org.apache.qpid.broker", "subscription");
+ for (QmfConsoleData subscription : subscriptions)
+ {
+ QmfConsoleData queue = dereference(subscription.getRefValue("queueRef"));
+ QmfConsoleData session = dereference(subscription.getRefValue("sessionRef"));
+ QmfConsoleData connection = dereference(session.getRefValue("connectionRef"));
+
+ String queueName = queue.getStringValue("name");
+ String address = connection.getStringValue("address");
+ String timestamp = new Date(subscription.getCreateTime()/1000000l).toString();
+ validateQueue(queueName, address, timestamp);
+ }
+ }
+
+ /**
+ * Dereferences an ObjectId returning a QmfConsoleData.
+ * @param ref the ObjectId to be dereferenced.
+ * @return the dereferenced QmfConsoleData object or null if the object can't be found.
+ */
+ private QmfConsoleData dereference(final ObjectId ref)
+ {
+ List<QmfConsoleData> data = _console.getObjects(ref);
+ if (data.size() == 1)
+ {
+ return data.get(0);
+ }
+ return null;
+ }
+
+ /**
+ * Looks up the exchange and binding information from the supplied queuename then calls the main validateQueue()
+ * @param queueName the name of the queue that we want to check against the whitelists.
+ * @param exchangeName the name of the exchange that the queue we want to check against the whitelists is bound to.
+ * @param binding the binding associating queue "queueName" with exchange "exchangeName".
+ * @param address the connection address information for the subscription.
+ * @param timestamp the timestamp of the subscription.
+ */
+ private void validateQueue(final String queueName, String exchangeName, final QmfConsoleData binding,
+ final String address, final String timestamp)
+ {
+ if (_exchangeWhitelist.contains(exchangeName))
+ { // Check exchangeName against the exchangeWhitelist and if it's in there we simply return.
+ return;
+ }
+
+ if (_queueWhitelist.contains(queueName))
+ { // Check queueName against the queueWhitelist and if it's in there we simply return.
+ return;
+ }
+
+ if (exchangeName.equals(""))
+ { // Make exchangeName render more prettily if necessary.
+ exchangeName = "''";
+ }
+
+ String bindingKey = binding.getStringValue("bindingKey");
+ Map arguments = (Map)binding.getValue("arguments");
+ if (arguments.isEmpty())
+ {
+ System.out.printf("%s ALERT ConnectionAudit.validateQueue() validation failed for queue: %s with binding[%s] => %s from address: %s with connection timestamp %s\n\n", new Date().toString(), queueName, bindingKey, exchangeName, address, timestamp);
+ }
+ else
+ { // If there are binding arguments then it's a headers exchange so display accordimgly.
+ System.out.printf("%s ALERT ConnectionAudit.validateQueue() validation failed for queue: %s with binding[%s] => %s %s from address: %s with connection timestamp %s\n\n", new Date().toString(), queueName, bindingKey, exchangeName, arguments, address, timestamp);
+ }
+ }
+
+ /**
+ * Looks up the exchange and binding information from the supplied queuename then calls the main validateQueue()
+ * @param queueName the name of the queue that we want to check against the whitelists.
+ * @param address the connection address information for the subscription.
+ * @param timestamp the timestamp of the subscription.
+ */
+ private void validateQueue(final String queueName, final String address, final String timestamp)
+ {
+ ObjectId queueId = null;
+ List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue");
+ for (QmfConsoleData queue : queues)
+ { // We first have to find the ObjectId of the queue called queueName.
+ if (queue.getStringValue("name").equals(queueName))
+ {
+ queueId = queue.getObjectId();
+ break;
+ }
+ }
+
+ if (queueId == null)
+ {
+ System.out.printf("%s ERROR ConnectionAudit.validateQueue() %s reference couldn't be found\n",
+ new Date().toString(), queueName);
+ }
+ else
+ { // If we've got the queue's ObjectId we then find the binding that references it.
+ List<QmfConsoleData> bindings = _console.getObjects("org.apache.qpid.broker", "binding");
+ for (QmfConsoleData binding : bindings)
+ {
+ ObjectId queueRef = binding.getRefValue("queueRef");
+ if (queueRef.equals(queueId))
+ { // We've found a binding that matches queue queueName so look up the associated exchange and validate.
+ QmfConsoleData exchange = dereference(binding.getRefValue("exchangeRef"));
+ String exchangeName = exchange.getStringValue("name");
+ validateQueue(queueName, exchangeName, binding, address, timestamp);
+ }
+ }
+ }
+ }
+
+ /**
+ * Handles WorkItems delivered by the Console.
+ * <p>
+ * If we receive an EventReceivedWorkItem check if it is a subscribe event. If it is we check if the whitelist has
+ * changed, and if it has we re-read it. We then extract the queue name, exchange name, binding, connection address
+ * and timestamp and validate with the whitelsist.
+ * <p>
+ * If we receive an AgentRestartedWorkItem we revalidate all subscriptions as it's possible that a client connection
+ * could have been made to the broker before ConnectionAudit has successfully re-established its own connections.
+ * @param wi a QMF2 WorkItem object
+ */
+ public void onEvent(final WorkItem wi)
+ {
+ if (wi instanceof EventReceivedWorkItem)
+ {
+ EventReceivedWorkItem item = (EventReceivedWorkItem)wi;
+ QmfEvent event = item.getEvent();
+ String className = event.getSchemaClassId().getClassName();
+ if (className.equals("subscribe"))
+ {
+ readWhitelist();
+ String queueName = event.getStringValue("qName");
+ String address = event.getStringValue("rhost");
+ String timestamp = new Date(event.getTimestamp()/1000000l).toString();
+ validateQueue(queueName, address, timestamp);
+ }
+ }
+ else if (wi instanceof AgentRestartedWorkItem)
+ {
+ checkExistingSubscriptions();
+ }
+ }
+
+ /**
+ * This method first checks if the whitelist file exists, if not it clears the sets used as whitelists
+ * so that no whitelisting is applied. If the whitelist file does exist it is parsed by a DOM parser.
+ * <p>
+ * We look for all exchange and queue elements and populate the respective whitelist sets with their
+ * contents. Note that we check the whitelist file update time to avoid reading it if it hasn't been changed
+ */
+ private void readWhitelist()
+ {
+ File file = new File(_whitelist);
+ if (file.exists())
+ {
+ long mtime = file.lastModified();
+ if (mtime != _whitelistLastModified)
+ {
+ _whitelistLastModified = mtime;
+ _exchangeWhitelist.clear();
+ _queueWhitelist.clear();
+
+ try
+ {
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder docBuilder = factory.newDocumentBuilder();
+ Document doc = docBuilder.parse(file);
+
+ Element whitelist = doc.getDocumentElement();
+ if (whitelist.getNodeName().equals("whitelist"))
+ {
+ NodeList children = whitelist.getChildNodes();
+ for (int i = 0; i < children.getLength(); i++)
+ {
+ Node child = children.item(i);
+ if (child.getNodeName().equals("exchangeWhitelist"))
+ {
+ NodeList exchanges = child.getChildNodes();
+ for (int j = 0; j < exchanges.getLength(); j++)
+ {
+ Node node = exchanges.item(j);
+ if (node.getNodeName().equals("exchange"))
+ {
+ if (node.hasChildNodes())
+ {
+ String exchange = node.getFirstChild().getNodeValue();
+ _exchangeWhitelist.add(exchange);
+ }
+ else
+ {
+ _exchangeWhitelist.add("");
+ }
+ }
+ }
+ }
+ else if (child.getNodeName().equals("queueWhitelist"))
+ {
+ NodeList queues = child.getChildNodes();
+ for (int j = 0; j < queues.getLength(); j++)
+ {
+ Node node = queues.item(j);
+ if (node.getNodeName().equals("queue"))
+ {
+ if (node.hasChildNodes())
+ {
+ String queue = node.getFirstChild().getNodeValue();
+ _queueWhitelist.add(queue);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ { // Failed to parse correctly.
+ System.out.println("Exception " + e + " while reading " + _whitelist);
+ System.out.println(new Date().toString() + " WARN ConnectionAudit.readWhitelist() " +
+ _whitelist + " failed: " + e.getMessage());
+ return;
+ }
+ }
+ }
+ else
+ { // If whitelist file doesn't exist log a warning and clear the whitelists.
+ System.out.println(new Date().toString() + " WARN ConnectionAudit.readWhitelist() " +
+ _whitelist + " doesn't exist");
+ _exchangeWhitelist.clear();
+ _queueWhitelist.clear();
+ }
+ } // End of readWhitelist()
+
+ /**
+ * Runs ConnectionAudit.
+ * @param args the command line arguments.
+ */
+ public static void main(final String[] args)
+ {
+ String logLevel = System.getProperty("amqj.logging.level");
+ logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG.
+ System.setProperty("amqj.logging.level", logLevel);
+
+ String[] longOpts = {"help", "whitelist=", "sasl-mechanism="};
+ try
+ {
+ String connectionOptions = "{reconnect: true}";
+ String whitelist = "./whitelist.xml";
+ GetOpt getopt = new GetOpt(args, "h", longOpts);
+ List<String[]> optList = getopt.getOptList();
+ String[] cargs = {};
+ cargs = getopt.getEncArgs().toArray(cargs);
+ for (String[] opt : optList)
+ {
+ if (opt[0].equals("-h") || opt[0].equals("--help"))
+ {
+ System.out.println(_usage);
+ System.out.println(_description);
+ System.out.println(_options);
+ System.exit(1);
+ }
+ else if (opt[0].equals("--whitelist"))
+ {
+ whitelist = opt[1];
+ }
+ else if (opt[0].equals("--sasl-mechanism"))
+ {
+ connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}";
+ }
+ }
+
+ int nargs = cargs.length;
+ if (nargs == 0)
+ {
+ cargs = new String[] {"localhost"};
+ }
+
+ for (String url : cargs)
+ {
+ ConnectionAudit eventPrinter = new ConnectionAudit(url, connectionOptions, whitelist);
+ }
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(_usage);
+ System.exit(1);
+ }
+
+ try
+ { // Block here
+ Thread.currentThread().join();
+ }
+ catch (InterruptedException ie)
+ {
+ }
+ }
+}
diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionLogger.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionLogger.java
new file mode 100644
index 0000000000..f9364b4069
--- /dev/null
+++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ConnectionLogger.java
@@ -0,0 +1,383 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfEvent;
+import org.apache.qpid.qmf2.common.QmfEventListener;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.common.WorkItem;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.AgentHeartbeatWorkItem;
+import org.apache.qpid.qmf2.console.AgentRestartedWorkItem;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.EventReceivedWorkItem;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+
+/**
+ * ConnectionLogger is a QMF2 class used to provide information about connections made to a broker.
+ * <p>
+ * In default mode ConnectionLogger lists the connections made to a broker along with information about sessions
+ * such as whether any subscriptions are associated with the session (if a session has no subscriptions then it's
+ * quite likely to be a "producer only" session, so this knowledge is quite useful).
+ * <p>
+ * In "log queue and binding" mode the information provided is very similar to qpid-config -b queues but with
+ * additional connection related information provided as with default mode.
+ *
+ * <pre>
+ * Usage: ConnectionLogger [options]
+ *
+ * Options:
+ * -h, --help show this help message and exit
+ * -q log queue and binding information for consumer connection
+ * -a &lt;address&gt;, --broker-address=&lt;address&gt;
+ * broker-addr is in the form: [username/password@]
+ * hostname | ip-address [:&lt;port&gt;] ex: localhost,
+ * 10.1.1.7:10000, broker-host:10000,
+ * guest/guest@localhost
+ * --sasl-mechanism=&lt;mech&gt;
+ * SASL mechanism for authentication (e.g. EXTERNAL,
+ * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL
+ * automatically picks the most secure available
+ * mechanism - use this option to override.
+ * </pre>
+ * @author Fraser Adams
+ */
+public final class ConnectionLogger implements QmfEventListener
+{
+ private static final String _usage =
+ "Usage: ConnectionLogger [options]\n";
+
+ private static final String _options =
+ "Options:\n" +
+ " -h, --help show this help message and exit\n" +
+ " -q log queue and binding information for consumer connections\n" +
+ " -a <address>, --broker-address=<address>\n" +
+ " broker-addr is in the form: [username/password@]\n" +
+ " hostname | ip-address [:<port>] ex: localhost,\n" +
+ " 10.1.1.7:10000, broker-host:10000,\n" +
+ " guest/guest@localhost\n" +
+ " --sasl-mechanism=<mech>\n" +
+ " SASL mechanism for authentication (e.g. EXTERNAL,\n" +
+ " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" +
+ " automatically picks the most secure available\n" +
+ " mechanism - use this option to override.\n";
+
+ private Console _console;
+ private boolean _logQueues;
+
+ // If any queues get added or deleted we set this to flag that we need to re-log connections on next heartbeat.
+ private boolean _stateChanged = false;
+
+ /**
+ * Basic constructor. Creates JMS Session, Initialises Destinations, Producers &amp; Consumers and starts connection.
+ * @param url the connection URL.
+ * @param connectionOptions the options String to pass to ConnectionHelper.
+ * @param logQueues flags whether queue &amp; binding information is logged as well as connection info.
+ */
+ public ConnectionLogger(final String url, final String connectionOptions, final boolean logQueues)
+ {
+ try
+ {
+ Connection connection = ConnectionHelper.createConnection(url, connectionOptions);
+ _console = new Console(this);
+ _console.addConnection(connection);
+ _logQueues = logQueues;
+ System.out.println("Hit Return to exit");
+ logConnectionInformation();
+ }
+ catch (QmfException qmfe)
+ {
+ System.err.println ("QmfException " + qmfe.getMessage() + " caught in ConnectionLogger constructor");
+ }
+ }
+
+ /**
+ * Finds a QmfConsoleData in a List of QMF Objects that matches a given ObjectID
+ *
+ * More or less a direct Java port of findById from qpid-config
+ *
+ * @param items the List of QMF Objects to search
+ * @param id the ObjectId we're searching the List for
+ * @return return the found object as a QmfConsoleData else return null
+ */
+ private QmfConsoleData findById(final List<QmfConsoleData> items, final ObjectId id)
+ {
+ for (QmfConsoleData item : items)
+ {
+ if (item.getObjectId().equals(id))
+ {
+ return item;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * For every queue list the bindings (equivalent of qpid-config -b queues)
+ *
+ * More or less a direct Java port of QueueListRecurse in qpid-config, which handles qpid-config -b queues
+ *
+ * @param ref If ref is null list info about all queues else list info about queue referenced by ObjectID
+ */
+ private void logQueueInformation(final ObjectId ref)
+ {
+ List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue");
+ List<QmfConsoleData> bindings = _console.getObjects("org.apache.qpid.broker", "binding");
+ List<QmfConsoleData> exchanges = _console.getObjects("org.apache.qpid.broker", "exchange");
+
+ for (QmfConsoleData queue : queues)
+ {
+ ObjectId queueId = queue.getObjectId();
+
+ if (ref == null || ref.equals(queueId))
+ {
+ System.out.printf(" Queue '%s'\n", queue.getStringValue("name"));
+ System.out.println(" arguments " + (Map)queue.getValue("arguments"));
+
+ for (QmfConsoleData binding : bindings)
+ {
+ ObjectId queueRef = binding.getRefValue("queueRef");
+
+ if (queueRef.equals(queueId))
+ {
+ ObjectId exchangeRef = binding.getRefValue("exchangeRef");
+ QmfConsoleData exchange = findById(exchanges, exchangeRef);
+
+ String exchangeName = "<unknown>";
+ if (exchange != null)
+ {
+ exchangeName = exchange.getStringValue("name");
+ if (exchangeName.equals(""))
+ {
+ exchangeName = "''";
+ }
+ }
+
+ String bindingKey = binding.getStringValue("bindingKey");
+ Map arguments = (Map)binding.getValue("arguments");
+ if (arguments.isEmpty())
+ {
+ System.out.printf(" bind [%s] => %s\n", bindingKey, exchangeName);
+ }
+ else
+ {
+ // If there are binding arguments then it's a headers exchange
+ System.out.printf(" bind [%s] => %s %s\n", bindingKey, exchangeName, arguments);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Logs audit information about each connection made to the broker
+ *
+ * Obtains connection, session and subscription objects and iterates in turn through these comparing
+ * references to find the subscriptions association with sessions and sessions associated with
+ * connections. Ultimately it then uses logQueueInformation to display the queues associated with
+ * each subscription.
+ */
+ private void logConnectionInformation()
+ {
+ System.out.println("\n\n**** ConnectionLogger: Logging current connection information ****");
+
+ List<QmfConsoleData> connections = _console.getObjects("org.apache.qpid.broker", "connection");
+ List<QmfConsoleData> sessions = _console.getObjects("org.apache.qpid.broker", "session");
+ List<QmfConsoleData> subscriptions = _console.getObjects("org.apache.qpid.broker", "subscription");
+
+ for (QmfConsoleData connection : connections)
+ {
+ System.out.printf("\nConnection '%s'\n", connection.getStringValue("address"));
+
+ String[] properties = {"authIdentity","remoteProcessName", "federationLink"};
+ for (String p : properties)
+ {
+ System.out.println(p + ": " + connection.getStringValue(p));
+ }
+
+ System.out.println("createTimestamp: " + new Date(connection.getCreateTime()/1000000l));
+
+ ObjectId connectionId = connection.getObjectId();
+ for (QmfConsoleData session : sessions)
+ { // Iterate through all session objects
+ ObjectId connectionRef = session.getRefValue("connectionRef");
+ if (connectionRef.equals(connectionId))
+ { // But only select sessions that are associated with the connection under consideration.
+ System.out.printf("Session '%s'\n", session.getStringValue("name"));
+ int subscriptionCount = 0;
+ ObjectId sessionId = session.getObjectId();
+ for (QmfConsoleData subscription : subscriptions)
+ { // Iterate through all subscription objects
+ ObjectId sessionRef = subscription.getRefValue("sessionRef");
+ if (sessionRef.equals(sessionId))
+ { // But only select subscriptions that are associated with the session under consideration.
+ subscriptionCount++;
+ ObjectId queueRef = subscription.getRefValue("queueRef");
+ if (_logQueues)
+ {
+ logQueueInformation(queueRef);
+ }
+ }
+ }
+ if (subscriptionCount == 0)
+ {
+ System.out.println(" ** No Subscriptions for this Session - probably a producer only Session **");
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Listener for QMF2 WorkItems
+ * <p>
+ * This method looks for clientConnect or clientDisconnect Events and uses these as a trigger to log the new
+ * connection state when the next Heartbeat occurs.
+ * <p>
+ * There are a couple of reasons for using this approach rather than just calling logConnectionInformation()
+ * as soon as we see the clientConnect or clientDisconnect Event.
+ * <p>
+ * 1. We could potentially have lots of connection Events and redisplaying all of the connections for each
+ * Event is likely to be confusing.
+ * <p>
+ * 2. When a clientConnect Event occurs we don't have all of the informatin that we might need, for example this
+ * application checks the Session and Subscription information and also optionally Queue and Binding information.
+ * Creating Sessions/Subscriptions won't generally occur until some (possibly small, but possibly not) time
+ * after the Connection has been made. The approach taken here reduces spurious assertions that a Session is
+ * probably a "producer only" Session. As one of the use-cases for this tool is to attempt to flag up "producer
+ * only" Sessions we want to try and make it as reliable as possible.
+ *
+ * @param wi a QMF2 WorkItem object
+ */
+ public void onEvent(final WorkItem wi)
+ {
+ if (wi instanceof EventReceivedWorkItem)
+ {
+ EventReceivedWorkItem item = (EventReceivedWorkItem)wi;
+ Agent agent = item.getAgent();
+ QmfEvent event = item.getEvent();
+
+ String className = event.getSchemaClassId().getClassName();
+ if (className.equals("clientConnect") ||
+ className.equals("clientDisconnect"))
+ {
+ _stateChanged = true;
+ }
+ }
+ else if (wi instanceof AgentRestartedWorkItem)
+ {
+ _stateChanged = true;
+ }
+ else if (wi instanceof AgentHeartbeatWorkItem)
+ {
+ AgentHeartbeatWorkItem item = (AgentHeartbeatWorkItem)wi;
+ Agent agent = item.getAgent();
+
+ if (_stateChanged && agent.getName().contains("qpidd"))
+ {
+ logConnectionInformation();
+ _stateChanged = false;
+ }
+ }
+ }
+
+ /**
+ * Runs ConnectionLogger.
+ * @param args the command line arguments.
+ */
+ public static void main(final String[] args)
+ {
+ String logLevel = System.getProperty("amqj.logging.level");
+ logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG.
+ System.setProperty("amqj.logging.level", logLevel);
+
+ String[] longOpts = {"help", "broker-address=", "sasl-mechanism="};
+ try
+ {
+ String host = "localhost";
+ String connectionOptions = "{reconnect: true}";
+ boolean logQueues = false;
+
+ GetOpt getopt = new GetOpt(args, "ha:q", longOpts);
+ List<String[]> optList = getopt.getOptList();
+ String[] cargs = {};
+ cargs = getopt.getEncArgs().toArray(cargs);
+
+ for (String[] opt : optList)
+ {
+ if (opt[0].equals("-h") || opt[0].equals("--help"))
+ {
+ System.out.println(_usage);
+ System.out.println(_options);
+ System.exit(1);
+ }
+ else if (opt[0].equals("-a") || opt[0].equals("--broker-address"))
+ {
+ host = opt[1];
+ }
+ else if (opt[0].equals("-q"))
+ {
+ logQueues = true;
+ }
+ else if (opt[0].equals("--sasl-mechanism"))
+ {
+ connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}";
+ }
+ }
+
+ ConnectionLogger logger = new ConnectionLogger(host, connectionOptions, logQueues);
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(_usage);
+ System.out.println(e.getMessage());
+ System.exit(1);
+ }
+
+ BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in));
+ try
+ { // Blocks here until return is pressed
+ String s = commandLine.readLine();
+ System.exit(0);
+ }
+ catch (IOException e)
+ {
+ System.out.println ("ConnectionLogger main(): IOException: " + e.getMessage());
+ }
+ }
+}
diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java
new file mode 100644
index 0000000000..e4262b0577
--- /dev/null
+++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidConfig.java
@@ -0,0 +1,1517 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+import javax.jms.Connection;
+
+// Misc Imports
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfData;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+
+/**
+ * QpidConfig is a fairly "literal" Java port of the python qpid-config tool.
+ * <p>
+ * It's vaguely pointless, as the python qpid-config is the "canonical" qpid-config :-)
+ * Nonetheless, it's a useful intellectual exercise to illustrate using QMF2 from Java.
+ * <p>
+ * QpidConfig (unlike the python qpid-config) uses pure QMF2 for adding/deleting queues, exchanges &amp; bindings
+ * this provides useful illustration of how to do these things using the ManagementAgent method calls.
+ * <p>
+ * N.B. "create" and "delete" broker ManagementAgent methods were added in Qpid version 0.10, unfortunately these
+ * calls won't work for earlier versions of Qpid.
+ * <pre>
+ * Usage: qpid-config [OPTIONS]
+ * qpid-config [OPTIONS] exchanges [filter-string]
+ * qpid-config [OPTIONS] queues [filter-string]
+ * qpid-config [OPTIONS] add exchange &lt;type&gt; &lt;name&gt; [AddExchangeOptions]
+ * qpid-config [OPTIONS] del exchange &lt;name&gt;
+ * qpid-config [OPTIONS] add queue &lt;name&gt; [AddQueueOptions]
+ * qpid-config [OPTIONS] del queue &lt;name&gt; [DelQueueOptions]
+ * qpid-config [OPTIONS] bind &lt;exchange-name&gt; &lt;queue-name&gt; [binding-key]
+ * &lt;for type xml&gt; [-f -|filename]
+ * &lt;for type header&gt; [all|any] k1=v1 [, k2=v2...]
+ * qpid-config [OPTIONS] unbind &lt;exchange-name&gt; &lt;queue-name&gt; [binding-key]
+ *
+ * ADDRESS syntax:
+ *
+ * [username/password@] hostname
+ * ip-address [:&lt;port&gt;]
+ *
+ * Examples:
+ *
+ * $ qpid-config add queue q
+ * $ qpid-config add exchange direct d localhost:5672
+ * $ qpid-config exchanges 10.1.1.7:10000
+ * $ qpid-config queues guest/guest@broker-host:10000
+ *
+ * Add Exchange &lt;type&gt; values:
+ *
+ * direct Direct exchange for point-to-point communication
+ * fanout Fanout exchange for broadcast communication
+ * topic Topic exchange that routes messages using binding keys with wildcards
+ * headers Headers exchange that matches header fields against the binding keys
+ * xml XML Exchange - allows content filtering using an XQuery
+ *
+ *
+ * Queue Limit Actions
+ *
+ * none (default) - Use broker's default policy
+ * reject - Reject enqueued messages
+ * flow-to-disk - Page messages to disk
+ * ring - Replace oldest unacquired message with new
+ * ring-strict - Replace oldest message, reject if oldest is acquired
+ *
+ * Queue Ordering Policies
+ *
+ * fifo (default) - First in, first out
+ * lvq - Last Value Queue ordering, allows queue browsing
+ * lvq-no-browse - Last Value Queue ordering, browsing clients may lose data
+ *
+ * Options:
+ * -h, --help show this help message and exit
+ *
+ * General Options:
+ * -t &lt;secs&gt;, --timeout=&lt;secs&gt;
+ * Maximum time to wait for broker connection (in
+ * seconds)
+ * -b, --bindings Show bindings in queue or exchange list
+ * -a &lt;address&gt;, --broker-addr=&lt;address&gt;
+ * Maximum time to wait for broker connection (in
+ * seconds)
+ * --sasl-mechanism=&lt;mech&gt;
+ * SASL mechanism for authentication (e.g. EXTERNAL,
+ * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL
+ * automatically picks the most secure available
+ * mechanism - use this option to override.
+ *
+ * Options for Adding Exchanges and Queues:
+ * --alternate-exchange=&lt;aexname&gt;
+ * Name of the alternate-exchange for the new queue or
+ * exchange. Exchanges route messages to the alternate
+ * exchange if they are unable to route them elsewhere.
+ * Queues route messages to the alternate exchange if
+ * they are rejected by a subscriber or orphaned by queue
+ * deletion.
+ * --passive, --dry-run
+ * Do not actually add the exchange or queue, ensure that
+ * all parameters and permissions are correct and would
+ * allow it to be created.
+ * --durable The new queue or exchange is durable.
+ *
+ * Options for Adding Queues:
+ * --file-count=&lt;n&gt; Number of files in queue's persistence journal
+ * --file-size=&lt;n&gt; File size in pages (64Kib/page)
+ * --max-queue-size=&lt;n&gt;
+ * Maximum in-memory queue size as bytes
+ * --max-queue-count=&lt;n&gt;
+ * Maximum in-memory queue size as a number of messages
+ * --limit-policy=&lt;policy&gt;
+ * Action to take when queue limit is reached
+ * --order=&lt;ordering&gt; Queue ordering policy
+ * --flow-stop-size=&lt;n&gt;
+ * Turn on sender flow control when the number of queued
+ * bytes exceeds this value.
+ * --flow-resume-size=&lt;n&gt;
+ * Turn off sender flow control when the number of queued
+ * bytes drops below this value.
+ * --flow-stop-count=&lt;n&gt;
+ * Turn on sender flow control when the number of queued
+ * messages exceeds this value.
+ * --flow-resume-count=&lt;n&gt;
+ * Turn off sender flow control when the number of queued
+ * messages drops below this value.
+ * --argument=&lt;NAME=VALUE&gt;
+ * Specify a key-value pair to add to queue arguments
+ *
+ * Options for Adding Exchanges:
+ * --sequence Exchange will insert a 'qpid.msg_sequence' field in
+ * the message header
+ * --ive Exchange will behave as an 'initial-value-exchange',
+ * keeping a reference to the last message forwarded and
+ * enqueuing that message to newly bound queues.
+ *
+ * Options for Deleting Queues:
+ * --force Force delete of queue even if it's currently used or
+ * it's not empty
+ * --force-if-not-empty
+ * Force delete of queue even if it's not empty
+ * --force-if-not-used
+ * Force delete of queue even if it's currently used
+ *
+ * Options for Declaring Bindings:
+ * -f &lt;file.xq&gt;, --file=&lt;file.xq&gt;
+ * For XML Exchange bindings - specifies the name of a
+ * file containing an XQuery.
+ *
+ * </pre>
+ * @author Fraser Adams
+ */
+public final class QpidConfig
+{
+ private static final String _usage =
+ "Usage: qpid-config [OPTIONS]\n" +
+ " qpid-config [OPTIONS] exchanges [filter-string]\n" +
+ " qpid-config [OPTIONS] queues [filter-string]\n" +
+ " qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]\n" +
+ " qpid-config [OPTIONS] del exchange <name>\n" +
+ " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]\n" +
+ " qpid-config [OPTIONS] del queue <name> [DelQueueOptions]\n" +
+ " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]\n" +
+ " <for type xml> [-f -|filename]\n" +
+ " <for type header> [all|any] k1=v1 [, k2=v2...]\n" +
+ " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]\n";
+
+ private static final String _description =
+ "ADDRESS syntax:\n" +
+ "\n" +
+ " [username/password@] hostname\n" +
+ " ip-address [:<port>]\n" +
+ "\n" +
+ "Examples:\n" +
+ "\n" +
+ "$ qpid-config add queue q\n" +
+ "$ qpid-config add exchange direct d localhost:5672\n" +
+ "$ qpid-config exchanges 10.1.1.7:10000\n" +
+ "$ qpid-config queues guest/guest@broker-host:10000\n" +
+ "\n" +
+ "Add Exchange <type> values:\n" +
+ "\n" +
+ " direct Direct exchange for point-to-point communication\n" +
+ " fanout Fanout exchange for broadcast communication\n" +
+ " topic Topic exchange that routes messages using binding keys with wildcards\n" +
+ " headers Headers exchange that matches header fields against the binding keys\n" +
+ " xml XML Exchange - allows content filtering using an XQuery\n" +
+ "\n" +
+ "\n" +
+ "Queue Limit Actions\n" +
+ "\n" +
+ " none (default) - Use broker's default policy\n" +
+ " reject - Reject enqueued messages\n" +
+ " flow-to-disk - Page messages to disk\n" +
+ " ring - Replace oldest unacquired message with new\n" +
+ " ring-strict - Replace oldest message, reject if oldest is acquired\n" +
+ "\n" +
+ "Queue Ordering Policies\n" +
+ "\n" +
+ " fifo (default) - First in, first out\n" +
+ " lvq - Last Value Queue ordering, allows queue browsing\n" +
+ " lvq-no-browse - Last Value Queue ordering, browsing clients may lose data\n";
+
+ private static final String _options =
+ "Options:\n" +
+ " -h, --help show this help message and exit\n" +
+ "\n" +
+ " General Options:\n" +
+ " -t <secs>, --timeout=<secs>\n" +
+ " Maximum time to wait for broker connection (in\n" +
+ " seconds)\n" +
+ " -b, --bindings Show bindings in queue or exchange list\n" +
+ " -a <address>, --broker-addr=<address>\n" +
+ " Maximum time to wait for broker connection (in\n" +
+ " seconds)\n" +
+ " --sasl-mechanism=<mech>\n" +
+ " SASL mechanism for authentication (e.g. EXTERNAL,\n" +
+ " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" +
+ " automatically picks the most secure available\n" +
+ " mechanism - use this option to override.\n" +
+ "\n" +
+ " Options for Adding Exchanges and Queues:\n" +
+ " --alternate-exchange=<aexname>\n" +
+ " Name of the alternate-exchange for the new queue or\n" +
+ " exchange. Exchanges route messages to the alternate\n" +
+ " exchange if they are unable to route them elsewhere.\n" +
+ " Queues route messages to the alternate exchange if\n" +
+ " they are rejected by a subscriber or orphaned by queue\n" +
+ " deletion.\n" +
+ " --passive, --dry-run\n" +
+ " Do not actually add the exchange or queue, ensure that\n" +
+ " all parameters and permissions are correct and would\n" +
+ " allow it to be created.\n" +
+ " --durable The new queue or exchange is durable.\n" +
+ "\n" +
+ " Options for Adding Queues:\n" +
+ " --file-count=<n> Number of files in queue's persistence journal\n" +
+ " --file-size=<n> File size in pages (64Kib/page)\n" +
+ " --max-queue-size=<n>\n" +
+ " Maximum in-memory queue size as bytes\n" +
+ " --max-queue-count=<n>\n" +
+ " Maximum in-memory queue size as a number of messages\n" +
+ " --limit-policy=<policy>\n" +
+ " Action to take when queue limit is reached\n" +
+ " --order=<ordering> Queue ordering policy\n" +
+ " --flow-stop-size=<n>\n" +
+ " Turn on sender flow control when the number of queued\n" +
+ " bytes exceeds this value.\n" +
+ " --flow-resume-size=<n>\n" +
+ " Turn off sender flow control when the number of queued\n" +
+ " bytes drops below this value.\n" +
+ " --flow-stop-count=<n>\n" +
+ " Turn on sender flow control when the number of queued\n" +
+ " messages exceeds this value.\n" +
+ " --flow-resume-count=<n>\n" +
+ " Turn off sender flow control when the number of queued\n" +
+ " messages drops below this value.\n" +
+ " --argument=<NAME=VALUE>\n" +
+ " Specify a key-value pair to add to queue arguments\n" +
+ "\n" +
+ " Options for Adding Exchanges:\n" +
+ " --sequence Exchange will insert a 'qpid.msg_sequence' field in\n" +
+ " the message header\n" +
+ " --ive Exchange will behave as an 'initial-value-exchange',\n" +
+ " keeping a reference to the last message forwarded and\n" +
+ " enqueuing that message to newly bound queues.\n" +
+ "\n" +
+ " Options for Deleting Queues:\n" +
+ " --force Force delete of queue even if it's currently used or\n" +
+ " it's not empty\n" +
+ " --force-if-not-empty\n" +
+ " Force delete of queue even if it's not empty\n" +
+ " --force-if-not-used\n" +
+ " Force delete of queue even if it's currently used\n" +
+ "\n" +
+ " Options for Declaring Bindings:\n" +
+ " -f <file.xq>, --file=<file.xq>\n" +
+ " For XML Exchange bindings - specifies the name of a\n" +
+ " file containing an XQuery.\n";
+
+ private Console _console;
+ private QmfConsoleData _broker;
+
+ private boolean _recursive = false;
+ private String _host = "localhost";
+ private int _connTimeout = 10;
+ private String _altExchange = null;
+ private boolean _passive = false;
+ private boolean _durable = false;
+ private boolean _ifEmpty = true;
+ private boolean _ifUnused = true;
+ private long _fileCount = 8;
+ private long _fileSize = 24;
+ private long _maxQueueSize = 0;
+ private long _maxQueueCount = 0;
+ private String _limitPolicy = "none";
+ private String _order = "fifo";
+ private boolean _msgSequence = false;
+ private boolean _ive = false;
+ private String _file = null;
+
+ // New to Qpid 0.10 qpid-config
+ private String _saslMechanism = null;
+ private long _flowStopCount = 0;
+ private long _flowResumeCount = 0;
+ private long _flowStopSize = 0;
+ private long _flowResumeSize = 0;
+ private List<String> extraArguments = new ArrayList<String>();
+
+ private static final String FILECOUNT = "qpid.file_count";
+ private static final String FILESIZE = "qpid.file_size";
+ private static final String MAX_QUEUE_SIZE = "qpid.max_size";
+ private static final String MAX_QUEUE_COUNT = "qpid.max_count";
+ private static final String POLICY_TYPE = "qpid.policy_type";
+ private static final String LVQ = "qpid.last_value_queue";
+ private static final String LVQNB = "qpid.last_value_queue_no_browse";
+ private static final String MSG_SEQUENCE = "qpid.msg_sequence";
+ private static final String IVE = "qpid.ive";
+ private static final String FLOW_STOP_COUNT = "qpid.flow_stop_count";
+ private static final String FLOW_RESUME_COUNT = "qpid.flow_resume_count";
+ private static final String FLOW_STOP_SIZE = "qpid.flow_stop_size";
+ private static final String FLOW_RESUME_SIZE = "qpid.flow_resume_size";
+
+ // There are various arguments to declare that have specific program options in this utility.
+ // However there is now a generic mechanism for passing arguments as well. The SPECIAL_ARGS
+ // set contains the arguments for which there are specific program options defined i.e. the
+ // arguments for which there is special processing on add and list.
+ private static HashSet<String> SPECIAL_ARGS = new HashSet<String>();
+
+ static
+ {
+ SPECIAL_ARGS.add(FILECOUNT);
+ SPECIAL_ARGS.add(FILESIZE);
+ SPECIAL_ARGS.add(MAX_QUEUE_SIZE);
+ SPECIAL_ARGS.add(MAX_QUEUE_COUNT);
+ SPECIAL_ARGS.add(POLICY_TYPE);
+ SPECIAL_ARGS.add(LVQ);
+ SPECIAL_ARGS.add(LVQNB);
+ SPECIAL_ARGS.add(MSG_SEQUENCE);
+ SPECIAL_ARGS.add(IVE);
+ SPECIAL_ARGS.add(FLOW_STOP_COUNT);
+ SPECIAL_ARGS.add(FLOW_RESUME_COUNT);
+ SPECIAL_ARGS.add(FLOW_STOP_SIZE);
+ SPECIAL_ARGS.add(FLOW_RESUME_SIZE);
+ }
+
+ /**
+ * Display long-form QpidConfig usage.
+ */
+ private void usage()
+ {
+ System.out.println(_usage);
+ System.exit(1);
+ }
+
+ /**
+ * Display QpidConfig options.
+ */
+ private void options()
+ {
+ System.out.println(_usage);
+ System.out.println(_description);
+ System.out.println(_options);
+ System.exit(1);
+ }
+
+ /**
+ * Finds a QmfConsoleData instance in a List of QMF Objects that matches a given ObjectID.
+ *
+ * More or less a direct Java port of findById from qpid-config.
+ *
+ * @param items the List of QMF Objects to search.
+ * @param id the ObjectId we're searching the List for.
+ * @return return the found object as a QmfConsoleData else return null.
+ */
+ private QmfConsoleData findById(final List<QmfConsoleData> items, final ObjectId id)
+ {
+ for (QmfConsoleData item : items)
+ {
+ if (item.getObjectId().equals(id))
+ {
+ return item;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Provide a basic overview of the number and type of queues and exchanges.
+ */
+ private void overview()
+ {
+ List<QmfConsoleData> exchanges = _console.getObjects("org.apache.qpid.broker", "exchange");
+ List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue");
+
+ System.out.printf("Total Exchanges: %d\n", exchanges.size());
+
+ Map<String, AtomicInteger> etype = new HashMap<String, AtomicInteger>();
+ for (QmfConsoleData exchange : exchanges)
+ {
+ String exchangeType = exchange.getStringValue("type");
+ AtomicInteger n = etype.get(exchangeType);
+ if (n == null)
+ {
+ etype.put(exchangeType, new AtomicInteger(1));
+ }
+ else
+ {
+ n.getAndIncrement();
+ }
+ }
+
+ for (Map.Entry<String, AtomicInteger> entry : etype.entrySet())
+ {
+ System.out.printf("%15s: %s\n", entry.getKey(), entry.getValue());
+ }
+
+ System.out.println();
+ System.out.printf(" Total Queues: %d\n", queues.size());
+
+ int durable = 0;
+ for (QmfConsoleData queue : queues)
+ {
+ boolean isDurable = queue.getBooleanValue("durable");
+ if (isDurable)
+ {
+ durable++;
+ }
+ }
+
+ System.out.printf(" durable: %d\n", durable);
+ System.out.printf(" non-durable: %d\n", queues.size() - durable);
+ }
+
+ /**
+ * For every exchange list detailed info (equivalent of qpid-config exchanges).
+ *
+ * More or less a direct Java port of ExchangeList in qpid-config, which handles qpid-config exchanges.
+ *
+ * @param filter specifies the exchange name to display info for, if set to "" displays info for every exchange.
+ */
+ private void exchangeList(final String filter)
+ {
+ List<QmfConsoleData> exchanges = _console.getObjects("org.apache.qpid.broker", "exchange");
+
+ String caption1 = "Type ";
+ String caption2 = "Exchange Name";
+ int maxNameLen = caption2.length();
+
+ for (QmfConsoleData exchange : exchanges)
+ {
+ String name = exchange.getStringValue("name");
+ if (filter.equals("") || filter.equals(name))
+ {
+ if (name.length() > maxNameLen)
+ {
+ maxNameLen = name.length();
+ }
+ }
+ }
+
+ System.out.printf("%s%-" + maxNameLen + "s Attributes\n", caption1, caption2);
+
+ StringBuilder buf = new StringBuilder();
+ for (int i = 0; i < (((maxNameLen + caption1.length()) / 5) + 5); i++)
+ {
+ buf.append("=====");
+ }
+ String line = buf.toString();
+ System.out.println(line);
+
+ for (QmfConsoleData exchange : exchanges)
+ {
+ String name = exchange.getStringValue("name");
+ if (filter.equals("") || filter.equals(name))
+ {
+ System.out.printf("%-10s%-" + maxNameLen + "s ", exchange.getStringValue("type"), name);
+ Map args = (Map)exchange.getValue("arguments");
+ args = (args == null) ? Collections.EMPTY_MAP : args;
+
+ if (exchange.getBooleanValue("durable"))
+ {
+ System.out.printf("--durable ");
+ }
+
+ if (args.containsKey(MSG_SEQUENCE) && QmfData.getLong(args.get(MSG_SEQUENCE)) == 1)
+ {
+ System.out.printf("--sequence ");
+ }
+
+ if (args.containsKey(IVE) && QmfData.getLong(args.get(IVE)) == 1)
+ {
+ System.out.printf("--ive ");
+ }
+
+ if (exchange.hasValue("altExchange"))
+ {
+ ObjectId altExchangeRef = exchange.getRefValue("altExchange");
+ QmfConsoleData altExchange = findById(exchanges, altExchangeRef);
+ if (altExchange != null)
+ {
+ System.out.printf("--alternate-exchange=%s", altExchange.getStringValue("name"));
+ }
+ }
+
+ System.out.println();
+ }
+ }
+ }
+
+ /**
+ * For every exchange list the bindings (equivalent of qpid-config -b exchanges).
+ *
+ * More or less a direct Java port of ExchangeListRecurse in qpid-config, which handles qpid-config -b exchanges.
+ *
+ * @param filter specifies the exchange name to display info for, if set to "" displays info for every exchange.
+ */
+ private void exchangeListRecurse(final String filter)
+ {
+ List<QmfConsoleData> exchanges = _console.getObjects("org.apache.qpid.broker", "exchange");
+ List<QmfConsoleData> bindings = _console.getObjects("org.apache.qpid.broker", "binding");
+ List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue");
+
+ for (QmfConsoleData exchange : exchanges)
+ {
+ ObjectId exchangeId = exchange.getObjectId();
+ String name = exchange.getStringValue("name");
+
+ if (filter.equals("") || filter.equals(name))
+ {
+ System.out.printf("Exchange '%s' (%s)\n", name, exchange.getStringValue("type"));
+ for (QmfConsoleData binding : bindings)
+ {
+ ObjectId exchangeRef = binding.getRefValue("exchangeRef");
+
+ if (exchangeRef.equals(exchangeId))
+ {
+ ObjectId queueRef = binding.getRefValue("queueRef");
+ QmfConsoleData queue = findById(queues, queueRef);
+
+ String queueName = "<unknown>";
+ if (queue != null)
+ {
+ queueName = queue.getStringValue("name");
+ if (queueName.equals(""))
+ {
+ queueName = "''";
+ }
+ }
+
+ String bindingKey = binding.getStringValue("bindingKey");
+ Map arguments = (Map)binding.getValue("arguments");
+ if (arguments == null || arguments.isEmpty())
+ {
+ System.out.printf(" bind [%s] => %s\n", bindingKey, queueName);
+ }
+ else
+ {
+ // If there are binding arguments then it's a headers exchange
+ System.out.printf(" bind [%s] => %s %s\n", bindingKey, queueName, arguments);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * For every queue list detailed info (equivalent of qpid-config queues).
+ *
+ * More or less a direct Java port of QueueList in qpid-config, which handles qpid-config queues.
+ *
+ * @param filter specifies the queue name to display info for, if set to "" displays info for every queue.
+ */
+ private void queueList(final String filter)
+ {
+ List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue");
+
+ String caption = "Queue Name";
+ int maxNameLen = caption.length();
+
+ for (QmfConsoleData queue : queues)
+ {
+ String name = queue.getStringValue("name");
+ if (filter.equals("") || filter.equals(name))
+ {
+ if (name.length() > maxNameLen)
+ {
+ maxNameLen = name.length();
+ }
+ }
+ }
+
+ System.out.printf("%-" + maxNameLen + "s Attributes\n", caption);
+
+ StringBuilder buf = new StringBuilder();
+ for (int i = 0; i < ((maxNameLen / 5) + 5); i++)
+ {
+ buf.append("=====");
+ }
+ String line = buf.toString();
+ System.out.println(line);
+
+ for (QmfConsoleData queue : queues)
+ {
+ String name = queue.getStringValue("name");
+ if (filter.equals("") || filter.equals(name))
+ {
+ System.out.printf("%-" + maxNameLen + "s ", name);
+ Map<String, Object> args = queue.<Map<String, Object>>getValue("arguments");
+ args = (args == null) ? Collections.EMPTY_MAP : args;
+/*System.out.println(args);
+for (Map.Entry<String, Object> entry : args.entrySet()) {
+ System.out.println(entry.getKey() + " " + entry.getValue().getClass().getCanonicalName());
+}*/
+ if (queue.getBooleanValue("durable"))
+ {
+ System.out.printf("--durable ");
+ }
+
+ if (queue.getBooleanValue("autoDelete"))
+ {
+ System.out.printf("auto-del ");
+ }
+
+ if (queue.getBooleanValue("exclusive"))
+ {
+ System.out.printf("excl ");
+ }
+
+ if (args.containsKey(FILESIZE))
+ {
+ System.out.printf("--file-size=%d ", QmfData.getLong(args.get(FILESIZE)));
+ }
+
+ if (args.containsKey(FILECOUNT))
+ {
+ System.out.printf("--file-count=%d ", QmfData.getLong(args.get(FILECOUNT)));
+ }
+
+ if (args.containsKey(MAX_QUEUE_SIZE))
+ {
+ System.out.printf("--max-queue-size=%d ", QmfData.getLong(args.get(MAX_QUEUE_SIZE)));
+ }
+
+ if (args.containsKey(MAX_QUEUE_COUNT))
+ {
+ System.out.printf("--max-queue-count=%d ", QmfData.getLong(args.get(MAX_QUEUE_COUNT)));
+ }
+
+ if (args.containsKey(POLICY_TYPE))
+ {
+ System.out.printf("--limit-policy=%s ", (QmfData.getString(args.get(POLICY_TYPE))).replace("_", "-"));
+ }
+
+ if (args.containsKey(LVQ) && QmfData.getLong(args.get(LVQ)) == 1)
+ {
+ System.out.printf("--order lvq ");
+ }
+
+ if (args.containsKey(LVQNB) && QmfData.getLong(args.get(LVQNB)) == 1)
+ {
+ System.out.printf("--order lvq-no-browse ");
+ }
+
+ if (queue.hasValue("altExchange"))
+ {
+ ObjectId altExchangeRef = queue.getRefValue("altExchange");
+ List<QmfConsoleData> altExchanges = _console.getObjects(altExchangeRef);
+ if (altExchanges.size() == 1)
+ {
+ QmfConsoleData altExchange = altExchanges.get(0);
+ System.out.printf("--alternate-exchange=%s", altExchange.getStringValue("name"));
+ }
+ }
+
+ if (args.containsKey(FLOW_STOP_SIZE))
+ {
+ System.out.printf("--flow-stop-size=%d ", QmfData.getLong(args.get(FLOW_STOP_SIZE)));
+ }
+
+ if (args.containsKey(FLOW_RESUME_SIZE))
+ {
+ System.out.printf("--flow-resume-size=%d ", QmfData.getLong(args.get(FLOW_RESUME_SIZE)));
+ }
+
+ if (args.containsKey(FLOW_STOP_COUNT))
+ {
+ System.out.printf("--flow-stop-count=%d ", QmfData.getLong(args.get(FLOW_STOP_COUNT)));
+ }
+
+ if (args.containsKey(FLOW_RESUME_COUNT))
+ {
+ System.out.printf("--flow-resume-count=%d ", QmfData.getLong(args.get(FLOW_RESUME_COUNT)));
+ }
+
+ for (Map.Entry<String, Object> entry : args.entrySet())
+ { // Display generic queue arguments
+ if (!SPECIAL_ARGS.contains(entry.getKey()))
+ {
+ System.out.printf("--argument %s=%s ", entry.getKey(), entry.getValue());
+ }
+ }
+
+ System.out.println();
+ }
+ }
+ }
+
+ /**
+ * For every queue list the bindings (equivalent of qpid-config -b queues).
+ *
+ * More or less a direct Java port of QueueListRecurse in qpid-config, which handles qpid-config -b queues.
+ *
+ * @param filter specifies the queue name to display info for, if set to "" displays info for every queue.
+ */
+ private void queueListRecurse(final String filter)
+ {
+ List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue");
+ List<QmfConsoleData> bindings = _console.getObjects("org.apache.qpid.broker", "binding");
+ List<QmfConsoleData> exchanges = _console.getObjects("org.apache.qpid.broker", "exchange");
+
+ for (QmfConsoleData queue : queues)
+ {
+ ObjectId queueId = queue.getObjectId();
+ String name = queue.getStringValue("name");
+
+ if (filter.equals("") || filter.equals(name))
+ {
+ System.out.printf("Queue '%s'\n", name);
+
+ for (QmfConsoleData binding : bindings)
+ {
+ ObjectId queueRef = binding.getRefValue("queueRef");
+
+ if (queueRef.equals(queueId))
+ {
+ ObjectId exchangeRef = binding.getRefValue("exchangeRef");
+ QmfConsoleData exchange = findById(exchanges, exchangeRef);
+
+ String exchangeName = "<unknown>";
+ if (exchange != null)
+ {
+ exchangeName = exchange.getStringValue("name");
+ if (exchangeName.equals(""))
+ {
+ exchangeName = "''";
+ }
+ }
+
+ String bindingKey = binding.getStringValue("bindingKey");
+ Map arguments = (Map)binding.getValue("arguments");
+ if (arguments == null || arguments.isEmpty())
+ {
+ System.out.printf(" bind [%s] => %s\n", bindingKey, exchangeName);
+ }
+ else
+ {
+ // If there are binding arguments then it's a headers exchange
+ System.out.printf(" bind [%s] => %s %s\n", bindingKey, exchangeName, arguments);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Add an exchange using the QMF "create" method.
+ * @param args the exchange type is the first argument and the exchange name is the second argument.
+ * The remaining QMF method properties are populated form config parsed from the command line.
+ */
+ private void addExchange(final String[] args)
+ {
+ if (args.length < 2)
+ {
+ usage();
+ }
+
+ Map<String, Object> properties = new HashMap<String, Object>();
+ if (_durable)
+ {
+ properties.put("durable", true);
+ }
+
+ properties.put("exchange-type", args[0]);
+
+ if (_msgSequence)
+ {
+ properties.put(MSG_SEQUENCE, 1l);
+ }
+
+ if (_ive)
+ {
+ properties.put(IVE, 1l);
+ }
+
+ if (_altExchange != null)
+ {
+ properties.put("alternate-exchange", _altExchange);
+ }
+
+ QmfData arguments = new QmfData();
+ arguments.setValue("type", "exchange");
+ arguments.setValue("name", args[1]);
+ arguments.setValue("properties", properties);
+
+ try
+ {
+ _broker.invokeMethod("create", arguments);
+ }
+ catch (QmfException e)
+ {
+ System.out.println(e.getMessage());
+ }
+ // passive exchange creation not implemented yet (not sure how to do it using QMF2)
+ }
+
+ /**
+ * Add a queue using the QMF "create" method.
+ * @param args the queue name is the first argument.
+ * The remaining QMF method properties are populated form config parsed from the command line.
+ */
+ private void addQueue(final String[] args)
+ {
+ if (args.length < 1)
+ {
+ usage();
+ }
+
+ Map<String, Object> properties = new HashMap<String, Object>();
+
+ for (String a : extraArguments)
+ {
+ String[] r = a.split("=");
+ String value = r.length == 2 ? r[1] : null;
+ properties.put(r[0], value);
+ }
+
+ if (_durable)
+ {
+ properties.put("durable", true);
+ properties.put(FILECOUNT, _fileCount);
+ properties.put(FILESIZE, _fileSize);
+ }
+
+ if (_maxQueueSize > 0)
+ {
+ properties.put(MAX_QUEUE_SIZE, _maxQueueSize);
+ }
+
+ if (_maxQueueCount > 0)
+ {
+ properties.put(MAX_QUEUE_COUNT, _maxQueueCount);
+ }
+
+ if (_limitPolicy.equals("reject"))
+ {
+ properties.put(POLICY_TYPE, "reject");
+ }
+ else if (_limitPolicy.equals("flow-to-disk"))
+ {
+ properties.put(POLICY_TYPE, "flow_to_disk");
+ }
+ else if (_limitPolicy.equals("ring"))
+ {
+ properties.put(POLICY_TYPE, "ring");
+ }
+ else if (_limitPolicy.equals("ring-strict"))
+ {
+ properties.put(POLICY_TYPE, "ring_strict");
+ }
+
+ if (_order.equals("lvq"))
+ {
+ properties.put(LVQ, 1l);
+ }
+ else if (_order.equals("lvq-no-browse"))
+ {
+ properties.put(LVQNB, 1l);
+ }
+
+ if (_altExchange != null)
+ {
+ properties.put("alternate-exchange", _altExchange);
+ }
+
+ if (_flowStopSize > 0)
+ {
+ properties.put(FLOW_STOP_SIZE, _flowStopSize);
+ }
+
+ if (_flowResumeSize > 0)
+ {
+ properties.put(FLOW_RESUME_SIZE, _flowResumeSize);
+ }
+
+ if (_flowStopCount > 0)
+ {
+ properties.put(FLOW_STOP_COUNT, _flowStopCount);
+ }
+
+ if (_flowResumeCount > 0)
+ {
+ properties.put(FLOW_RESUME_COUNT, _flowResumeCount);
+ }
+
+ QmfData arguments = new QmfData();
+ arguments.setValue("type", "queue");
+ arguments.setValue("name", args[0]);
+ arguments.setValue("properties", properties);
+
+ try
+ {
+ _broker.invokeMethod("create", arguments);
+ }
+ catch (QmfException e)
+ {
+ System.out.println(e.getMessage());
+ }
+ // passive queue creation not implemented yet (not sure how to do it using QMF2)
+ }
+
+ /**
+ * Remove an exchange using the QMF "delete" method.
+ * @param args the exchange name is the first argument.
+ * The remaining QMF method properties are populated form config parsed from the command line.
+ */
+ private void delExchange(final String[] args)
+ {
+ if (args.length < 1)
+ {
+ usage();
+ }
+
+ QmfData arguments = new QmfData();
+ arguments.setValue("type", "exchange");
+ arguments.setValue("name", args[0]);
+
+ try
+ {
+ _broker.invokeMethod("delete", arguments);
+ }
+ catch (QmfException e)
+ {
+ System.out.println(e.getMessage());
+ }
+ }
+
+ /**
+ * Remove a queue using the QMF "delete" method.
+ * @param args the queue name is the first argument.
+ * The remaining QMF method properties are populated form config parsed from the command line.
+ */
+ private void delQueue(final String[] args)
+ {
+ if (args.length < 1)
+ {
+ usage();
+ }
+
+ if (_ifEmpty || _ifUnused)
+ { // Check the selected queue object to see if it is not empty or is in use
+ List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue");
+ for (QmfConsoleData queue : queues)
+ {
+ ObjectId queueId = queue.getObjectId();
+ String name = queue.getStringValue("name");
+ if (name.equals(args[0]))
+ {
+ long msgDepth = queue.getLongValue("msgDepth");
+ if (_ifEmpty == true && msgDepth > 0)
+ {
+ System.out.println("Cannot delete queue " + name + "; queue not empty");
+ return;
+ }
+
+ long consumerCount = queue.getLongValue("consumerCount");
+ if (_ifUnused == true && consumerCount > 0)
+ {
+ System.out.println("Cannot delete queue " + name + "; queue in use");
+ return;
+ }
+ }
+ }
+ }
+
+ QmfData arguments = new QmfData();
+ arguments.setValue("type", "queue");
+ arguments.setValue("name", args[0]);
+
+ try
+ {
+ _broker.invokeMethod("delete", arguments);
+ }
+ catch (QmfException e)
+ {
+ System.out.println(e.getMessage());
+ }
+ }
+
+ /**
+ * Add a binding using the QMF "create" method.
+ * @param args the exchange name is the first argument, the queue name is the second argument and the binding key
+ * is the third argument.
+ * The remaining QMF method properties are populated form config parsed from the command line.
+ */
+ private void bind(final String[] args)
+ {
+ if (args.length < 2)
+ {
+ usage();
+ }
+
+ // Look up exchange objects to find the type of the selected exchange
+ String exchangeType = null;
+ List<QmfConsoleData> exchanges = _console.getObjects("org.apache.qpid.broker", "exchange");
+ for (QmfConsoleData exchange : exchanges)
+ {
+ String name = exchange.getStringValue("name");
+ if (args[0].equals(name))
+ {
+ exchangeType = exchange.getStringValue("type");
+ break;
+ }
+ }
+
+ if (exchangeType == null)
+ {
+ System.out.println("Exchange " + args[0] + " is invalid");
+ return;
+ }
+
+ Map<String, Object> properties = new HashMap<String, Object>();
+ if (exchangeType.equals("xml"))
+ {
+ if (_file == null)
+ {
+ System.out.println("Invalid args to bind xml: need an input file or stdin");
+ return;
+ }
+
+ String xquery = null;
+ if (_file.equals("-"))
+ { // Read xquery off stdin
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+ try
+ {
+ StringBuilder buf = new StringBuilder();
+ String line;
+ while ((line = in.readLine()) != null) // read until eof
+ {
+ buf.append(line + "\n");
+ }
+ xquery = buf.toString();
+ }
+ catch (IOException ioe)
+ {
+ System.out.println("Exception " + ioe + " while reading stdin");
+ return;
+ }
+ }
+ else
+ { // Read xquery from input file
+ File file = new File(_file);
+ try
+ {
+ FileInputStream fin = new FileInputStream(file);
+ try
+ {
+ byte content[] = new byte[(int)file.length()];
+ fin.read(content);
+ xquery = new String(content);
+ }
+ finally
+ {
+ fin.close();
+ }
+ }
+ catch (FileNotFoundException e)
+ {
+ System.out.println("File " + _file + " not found");
+ return;
+ }
+ catch (IOException ioe)
+ {
+ System.out.println("Exception " + ioe + " while reading " + _file);
+ return;
+ }
+ }
+ properties.put("xquery", xquery);
+ }
+ else if (exchangeType.equals("headers"))
+ {
+ if (args.length < 5)
+ {
+ System.out.println("Invalid args to bind headers: need 'any'/'all' plus conditions");
+ return;
+ }
+ String op = args[3];
+ if (op.equals("all") || op.equals("any"))
+ {
+ properties.put("x-match", op);
+ String[] bindings = Arrays.copyOfRange(args, 4, args.length);
+ for (String binding : bindings)
+ {
+ if (binding.contains("="))
+ {
+ binding = binding.split(",")[0];
+ String[] kv = binding.split("=");
+ properties.put(kv[0], kv[1]);
+ }
+ }
+ }
+ else
+ {
+ System.out.println("Invalid condition arg to bind headers, need 'any' or 'all', not '" + op + "'");
+ return;
+ }
+ }
+
+ String bindingIdentifier = args[0] + "/" + args[1];
+ if (args.length > 2)
+ {
+ bindingIdentifier = bindingIdentifier + "/" + args[2];
+ }
+
+ QmfData arguments = new QmfData();
+ arguments.setValue("type", "binding");
+ arguments.setValue("name", bindingIdentifier);
+ arguments.setValue("properties", properties);
+
+ try
+ {
+ _broker.invokeMethod("create", arguments);
+ }
+ catch (QmfException e)
+ {
+ System.out.println(e.getMessage());
+ }
+ }
+
+ /**
+ * Remove a binding using the QMF "delete" method.
+ * @param args the exchange name is the first argument, the queue name is the second argument and the binding key
+ * is the third argument.
+ * The remaining QMF method properties are populated form config parsed from the command line.
+ */
+ private void unbind(final String[] args)
+ {
+ if (args.length < 2)
+ {
+ usage();
+ }
+
+ String bindingIdentifier = args[0] + "/" + args[1];
+ if (args.length > 2)
+ {
+ bindingIdentifier = bindingIdentifier + "/" + args[2];
+ }
+
+ QmfData arguments = new QmfData();
+ arguments.setValue("type", "binding");
+ arguments.setValue("name", bindingIdentifier);
+
+ try
+ {
+ _broker.invokeMethod("delete", arguments);
+ }
+ catch (QmfException e)
+ {
+ System.out.println(e.getMessage());
+ }
+ }
+
+ /**
+ * Create an instance of QpidConfig.
+ *
+ * @param args the command line arguments.
+ */
+ public QpidConfig(final String[] args)
+ {
+ String[] longOpts = {"help", "durable", "bindings", "broker-addr=", "file-count=",
+ "file-size=", "max-queue-size=", "max-queue-count=", "limit-policy=",
+ "order=", "sequence", "ive", "force", "force-if-not-empty",
+ "force-if-used", "alternate-exchange=", "passive", "timeout=", "file=", "flow-stop-size=",
+ "flow-resume-size=", "flow-stop-count=", "flow-resume-count=", "argument="};
+
+ try
+ {
+ GetOpt getopt = new GetOpt(args, "ha:bf:", longOpts);
+ List<String[]> optList = getopt.getOptList();
+ String[] cargs = {};
+ cargs = getopt.getEncArgs().toArray(cargs);
+
+ //System.out.println("optList");
+ for (String[] opt : optList)
+ {
+ //System.out.println(opt[0] + ":" + opt[1]);
+
+ if (opt[0].equals("-h") || opt[0].equals("--help"))
+ {
+ options();
+ }
+
+ if (opt[0].equals("-b") || opt[0].equals("--bindings"))
+ {
+ _recursive = true;
+ }
+
+ if (opt[0].equals("-a") || opt[0].equals("--broker-addr"))
+ {
+ _host = opt[1];
+ }
+
+ if (opt[0].equals("-f") || opt[0].equals("--file"))
+ {
+ _file = opt[1];
+ }
+
+ if (opt[0].equals("--timeout"))
+ {
+ _connTimeout = Integer.parseInt(opt[1]);
+ }
+
+ if (opt[0].equals("--alternate-exchange"))
+ {
+ _altExchange = opt[1];
+ }
+
+ if (opt[0].equals("--passive"))
+ {
+ _passive = true;
+ }
+
+ if (opt[0].equals("--durable"))
+ {
+ _durable = true;
+ }
+
+ if (opt[0].equals("--file-count"))
+ {
+ _fileCount = Long.parseLong(opt[1]);
+ }
+
+ if (opt[0].equals("--file-size"))
+ {
+ _fileSize = Long.parseLong(opt[1]);
+ }
+
+ if (opt[0].equals("--max-queue-size"))
+ {
+ _maxQueueSize = Long.parseLong(opt[1]);
+ }
+
+ if (opt[0].equals("--max-queue-count"))
+ {
+ _maxQueueCount = Long.parseLong(opt[1]);
+ }
+
+ if (opt[0].equals("--limit-policy"))
+ {
+ _limitPolicy = opt[1];
+ }
+
+ if (opt[0].equals("--flow-stop-size"))
+ {
+ _flowStopSize = Long.parseLong(opt[1]);
+ }
+
+ if (opt[0].equals("--flow-resume-size"))
+ {
+ _flowResumeSize = Long.parseLong(opt[1]);
+ }
+
+ if (opt[0].equals("--flow-stop-count"))
+ {
+ _flowStopCount = Long.parseLong(opt[1]);
+ }
+
+ if (opt[0].equals("--flow-resume-count"))
+ {
+ _flowResumeCount = Long.parseLong(opt[1]);
+ }
+
+ boolean validPolicy = false;
+ String[] validPolicies = {"none", "reject", "flow-to-disk", "ring", "ring-strict"};
+ for (String i : validPolicies)
+ {
+ if (_limitPolicy.equals(i))
+ {
+ validPolicy = true;
+ break;
+ }
+ }
+
+ if (!validPolicy)
+ {
+ System.err.println("Error: Invalid --limit-policy argument");
+ System.exit(1);
+ }
+
+ if (opt[0].equals("--order"))
+ {
+ _order = opt[1];
+ }
+
+ boolean validOrder = false;
+ String[] validOrders = {"fifo", "lvq", "lvq-no-browse"};
+ for (String i : validOrders)
+ {
+ if (_order.equals(i))
+ {
+ validOrder = true;
+ break;
+ }
+ }
+
+ if (!validOrder)
+ {
+ System.err.println("Error: Invalid --order argument");
+ System.exit(1);
+ }
+
+ if (opt[0].equals("--sequence"))
+ {
+ _msgSequence = true;
+ }
+
+ if (opt[0].equals("--ive"))
+ {
+ _ive = true;
+ }
+
+ if (opt[0].equals("--force"))
+ {
+ _ifEmpty = false;
+ _ifUnused = false;
+ }
+
+ if (opt[0].equals("--force-if-not-empty"))
+ {
+ _ifEmpty = false;
+ }
+
+ if (opt[0].equals("--force-if-used"))
+ {
+ _ifUnused = false;
+ }
+
+ if (opt[0].equals("--argument"))
+ {
+ extraArguments.add(opt[1]);
+ }
+ }
+
+ Connection connection = ConnectionHelper.createConnection(_host, "{reconnect: true}");
+ _console = new Console();
+ _console.disableEvents(); // Optimisation, as we're only doing getObjects() calls.
+ _console.addConnection(connection);
+ List<QmfConsoleData> brokers = _console.getObjects("org.apache.qpid.broker", "broker");
+ if (brokers.isEmpty())
+ {
+ System.out.println("No broker QmfConsoleData returned");
+ System.exit(1);
+ }
+
+ _broker = brokers.get(0);
+
+ int nargs = cargs.length;
+ if (nargs == 0)
+ {
+ overview();
+ }
+ else
+ {
+ String cmd = cargs[0];
+ String modifier = "";
+
+ if (nargs > 1)
+ {
+ modifier = cargs[1];
+ }
+
+ if (cmd.equals("exchanges"))
+ {
+ if (_recursive)
+ {
+ exchangeListRecurse(modifier);
+ }
+ else
+ {
+ exchangeList(modifier);
+ }
+ }
+ else if (cmd.equals("queues"))
+ {
+ if (_recursive)
+ {
+ queueListRecurse(modifier);
+ }
+ else
+ {
+ queueList(modifier);
+ }
+ }
+ else if (cmd.equals("add"))
+ {
+ if (modifier.equals("exchange"))
+ {
+ addExchange(Arrays.copyOfRange(cargs, 2, cargs.length));
+ }
+ else if (modifier.equals("queue"))
+ {
+ addQueue(Arrays.copyOfRange(cargs, 2, cargs.length));
+ }
+ else
+ {
+ usage();
+ }
+ }
+ else if (cmd.equals("del"))
+ {
+ if (modifier.equals("exchange"))
+ {
+ delExchange(Arrays.copyOfRange(cargs, 2, cargs.length));
+ }
+ else if (modifier.equals("queue"))
+ {
+ delQueue(Arrays.copyOfRange(cargs, 2, cargs.length));
+ }
+ else
+ {
+ usage();
+ }
+ }
+ else if (cmd.equals("bind"))
+ {
+ bind(Arrays.copyOfRange(cargs, 1, cargs.length));
+ }
+ else if (cmd.equals("unbind"))
+ {
+ unbind(Arrays.copyOfRange(cargs, 1, cargs.length));
+ }
+ else
+ {
+ usage();
+ }
+ }
+ }
+ catch (QmfException e)
+ {
+ System.err.println(e.toString());
+ usage();
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.err.println(e.toString());
+ usage();
+ }
+ }
+
+ /**
+ * Runs QpidConfig.
+ * @param args the command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ String logLevel = System.getProperty("amqj.logging.level");
+ logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG.
+ System.setProperty("amqj.logging.level", logLevel);
+
+ // As of Qpid 0.16 the Session Dispatcher Thread is non-Daemon so the JVM gets prevented from exiting.
+ // Setting the following property to true makes it a Daemon Thread.
+ System.setProperty("qpid.jms.daemon.dispatcher", "true");
+
+ QpidConfig qpidConfig = new QpidConfig(args);
+ }
+}
diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java
new file mode 100644
index 0000000000..fa06b06519
--- /dev/null
+++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java
@@ -0,0 +1,358 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfData;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.MethodResult;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+
+// Reuse this class as it provides a handy mechanism to parse an args String into a Map
+import org.apache.qpid.messaging.util.AddressParser;
+
+/**
+ * A tool to allow QMF2 methods to be invoked from the command line.
+ * <pre>
+ * Usage: QpidCtrl [options] command [args]
+ * The args need to be in a Stringified Map format (similar to an Address String)
+ * e.g. to set broker log level: QpidCtrl setLogLevel "{level:\"debug+:Broker\"}"
+ * The listValues command lists property names and values of the specified object.
+ * The listObjects command lists all objects of the specified package and class.
+ *
+ * Options:
+ * -h, --help show this help message and exit
+ * -v enable logging
+ * -a &lt;address&gt;, --broker-address=&lt;address&gt;
+ * broker-addr is in the form: [username/password@]
+ * hostname | ip-address [:&lt;port&gt;] ex: localhost,
+ * 10.1.1.7:10000, broker-host:10000,
+ * guest/guest@localhost
+ * -c &lt;class&gt;, --class=&lt;class&gt;
+ * class of object on which command is being invoked
+ * (default broker)
+ * -p &lt;package&gt;, --package=&lt;package&gt;
+ * package of object on which command is being invoked
+ * (default org.apache.qpid.broker)
+ * -i &lt;id&gt;, --id=&lt;id&gt; identifier of object on which command is being invoked
+ * (default amqp-broker)
+ * --agent=&lt;agent name&gt;
+ * The name of the Agent to which commands will be sent
+ * This will try to match &lt;agent name&gt; against the Agent name
+ * the Agent product name and will also check if the Agent name
+ * contains the &lt;agent name&gt; String
+ * (default qpidd)
+ * --sasl-mechanism=&lt;mech&gt;
+ * SASL mechanism for authentication (e.g. EXTERNAL,
+ * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL
+ * automatically picks the most secure available
+ * mechanism - use this option to override.
+ * </pre>
+ * Examples (Note the quotes and escaped quotes are significant!):
+ * <p>
+ * Get the current broker log level:
+ * <pre>QpidCtrl getLogLevel</pre>
+ *
+ * Set the current broker log level to notice+:
+ * <pre>QpidCtrl setLogLevel "{level:\"notice+\"}"</pre>
+ *
+ * Set the current broker log level to debug+ for all Management Objects:
+ * <pre>QpidCtrl setLogLevel "{level:\"debug+\"}"</pre>
+ *
+ * Set the current broker log level to debug+ for just the Broker Management Object:
+ * <pre>QpidCtrl setLogLevel "{level:\"debug+:Broker\"}"</pre>
+ *
+ * List the properties of the qmf.default.direct exchange:
+ * <pre>QpidCtrl -c exchange -i qmf.default.direct listValues</pre>
+ *
+ * Create a queue called test with a flow-to-disk limit policy:
+ * <pre>QpidCtrl create "{type:queue,name:test,properties:{'qpid.policy_type':ring}}"</pre>
+ *
+ * Delete a queue called test:
+ * <pre>QpidCtrl delete "{type:queue,name:test}"</pre>
+ *
+ * Create a binding called bind1 between the amq.match exchange and the test queue matching the headers name=fadams
+ * and gender=male:
+ * <pre>QpidCtrl create "{type:binding,name:'amq.match/test/bind1',properties:{x-match:all,name:fadams,gender:male}}"</pre>
+ *
+ * Delete the binding called bind1 between the amq.match exchange and the test queue:
+ * <pre>QpidCtrl delete "{type:binding,name:'amq.match/test/bind1'}"</pre>
+ *
+ * Get the broker to echo a message:
+ * <pre>QpidCtrl echo "{sequence:1234,body:'Peaches En Regalia'}"</pre>
+ *
+ * Invoke the event method on the gizmo Agent (launch gizmo Agent via AgentTest):
+ * <pre>QpidCtrl -p com.profitron.gizmo -c control -i OPERATIONAL --agent=gizmo event</pre>
+ *
+ * Invoke the create_child method on the gizmo Agent (launch gizmo Agent via AgentTest):
+ * <pre>QpidCtrl -p com.profitron.gizmo -c control -i OPERATIONAL --agent=gizmo create_child "{name:monkeyBoy}"</pre>
+ *
+ * Invoke the stop method on the gizmo Agent (launch gizmo Agent via AgentTest):
+ * <pre>QpidCtrl -p com.profitron.gizmo -c control -i OPERATIONAL --agent=gizmo stop "{message:'Will I dream?'}"</pre>
+ *
+ * @author Fraser Adams
+ */
+public final class QpidCtrl
+{
+ private static final String _usage =
+ "Usage: QpidCtrl [options] command [args]\n" +
+ "The args need to be in a Stringified Map format (similar to an Address String)\n" +
+ "e.g. to set broker log level: QpidCtrl setLogLevel \"{level:\\\"debug+:Broker\\\"}\"\n" +
+ "The listValues command lists property names and values of the specified object.\n" +
+ "The listObjects command lists all objects of the specified package and class.\n";
+
+ private static final String _options =
+ "Options:\n" +
+ " -h, --help show this help message and exit\n" +
+ " -v enable logging\n" +
+ " -a <address>, --broker-address=<address>\n" +
+ " broker-addr is in the form: [username/password@]\n" +
+ " hostname | ip-address [:<port>] ex: localhost,\n" +
+ " 10.1.1.7:10000, broker-host:10000,\n" +
+ " guest/guest@localhost\n" +
+ " -c <class>, --class=<class>\n" +
+ " class of object on which command is being invoked\n" +
+ " (default broker)\n" +
+ " -p <package>, --package=<package>\n" +
+ " package of object on which command is being invoked\n" +
+ " (default org.apache.qpid.broker)\n" +
+ " -i <id>, --id=<id> identifier of object on which command is being invoked\n" +
+ " (default amqp-broker)\n" +
+ " --agent=<agent name>\n" +
+ " The name of the Agent to which commands will be sent\n" +
+ " This will try to match <agent name> against the Agent name,\n" +
+ " the Agent product name and will also check if the Agent name\n" +
+ " contains the <agent name> String\n" +
+ " (default qpidd)\n" +
+ " --sasl-mechanism=<mech>\n" +
+ " SASL mechanism for authentication (e.g. EXTERNAL,\n" +
+ " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" +
+ " automatically picks the most secure available\n" +
+ " mechanism - use this option to override.\n";
+
+ private Console _console;
+
+ /**
+ * Basic constructor. Creates JMS Session, Initialises Destinations, Producers &amp; Consumers and starts connection.
+ * @param url the Connection URL.
+ * @param connectionOptions the connection options String to pass to ConnectionHelper.
+ * @param pkg the package name of the object we're invoking the method on.
+ * @param cls the class name of the object we're invoking the method on.
+ * @param id the ObjectId name of the object we're invoking the method on.
+ * @param agentName the name of the Agent to invoke the QMF method on.
+ * @param command the QMF method we're invoking.
+ * @param args the Stringified Map form of the method arguments.
+ */
+ public QpidCtrl(final String url, final String connectionOptions, final String pkg, final String cls,
+ final String id, final String agentName, final String command, final String args)
+ {
+ try
+ {
+ Connection connection = ConnectionHelper.createConnection(url, connectionOptions);
+ _console = new Console();
+ _console.addConnection(connection);
+
+ // Find the specified Agent
+ Agent agent = _console.findAgent(agentName);
+ if (agent == null)
+ {
+ System.out.println("Agent " + agentName + " not found");
+ System.exit(1);
+ }
+
+ List<Agent> agentList = Arrays.asList(new Agent[] {agent});
+ List<QmfConsoleData> objects = _console.getObjects(pkg, cls, agentList);
+
+ // Parse the args String
+ QmfData inArgs = (args == null) ? new QmfData() : new QmfData(new AddressParser(args).map());
+
+ // Find the required QmfConsoleData object and invoke the specified command
+ MethodResult results = null;
+ for (QmfConsoleData object : objects)
+ {
+ String objectName = object.getObjectId().getObjectName();
+ if (command.equals("listObjects"))
+ {
+ System.out.println(objectName);
+ }
+ else
+ {
+ if (objectName.contains(id))
+ { // Use contains as ObjectNames may comprise other identifiers tha make using equals impractical
+ if (command.equals("listValues"))
+ {
+ object.listValues();
+ System.exit(1);
+ }
+ else
+ {
+ results = object.invokeMethod(command, inArgs);
+ }
+ break;
+ }
+ }
+ }
+
+ if (results == null)
+ {
+ if (objects.size() == 0)
+ {
+ System.out.println("getObjects(" + pkg + ", " + cls + ", " + agentName + ") returned no objects.");
+ }
+ else
+ {
+ System.out.println("Id " + id + " not found in " + pkg + ":" + cls);
+ }
+ }
+ else
+ {
+ if (results.succeeded())
+ {
+ results.listValues();
+ }
+ else
+ {
+ System.err.println ("QmfException " + results.getQmfException().getMessage() +
+ " returned from " + command + " method");
+ }
+ }
+ }
+ catch (QmfException qmfe)
+ {
+ System.err.println ("QmfException " + qmfe.getMessage() + " caught in QpidCtrl constructor");
+ }
+ }
+
+ /**
+ * Runs QpidCtrl.
+ * @param args the command line arguments.
+ */
+ public static void main(final String[] args)
+ {
+ String logLevel = System.getProperty("amqj.logging.level");
+ logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG.
+ System.setProperty("amqj.logging.level", logLevel);
+
+ // As of Qpid 0.16 the Session Dispatcher Thread is non-Daemon so the JVM gets prevented from exiting.
+ // Setting the following property to true makes it a Daemon Thread.
+ System.setProperty("qpid.jms.daemon.dispatcher", "true");
+
+ String[] longOpts = {"help", "broker-address=", "class=", "package=", "id=", "agent=", "sasl-mechanism="};
+ try
+ {
+ String host = "localhost";
+ String connectionOptions = "{reconnect: true}";
+ String cls = "broker";
+ String pkg = "org.apache.qpid.broker";
+ String id = "amqp-broker";
+ String agentName = "qpidd";
+ String command = null;
+ String arg = null;
+
+ GetOpt getopt = new GetOpt(args, "ha:c:p:i:v", longOpts);
+ List<String[]> optList = getopt.getOptList();
+ String[] cargs = {};
+ cargs = getopt.getEncArgs().toArray(cargs);
+
+ for (String[] opt : optList)
+ {
+ if (opt[0].equals("-h") || opt[0].equals("--help"))
+ {
+ System.out.println(_usage);
+ System.out.println(_options);
+ System.exit(1);
+ }
+ else if (opt[0].equals("-a") || opt[0].equals("--broker-address"))
+ {
+ host = opt[1];
+ }
+ else if (opt[0].equals("-c") || opt[0].equals("--class"))
+ {
+ cls = opt[1];
+ }
+ else if (opt[0].equals("-p") || opt[0].equals("--package"))
+ {
+ pkg = opt[1];
+ }
+ else if (opt[0].equals("-i") || opt[0].equals("--id"))
+ {
+ id = opt[1];
+ }
+ else if (opt[0].equals("--agent"))
+ {
+ agentName = opt[1];
+ }
+ else if (opt[0].equals("-v"))
+ {
+ System.setProperty("amqj.logging.level", "DEBUG");
+ }
+ else if (opt[0].equals("--sasl-mechanism"))
+ {
+ connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}";
+ }
+ }
+
+ if (cargs.length < 1 || cargs.length > 2)
+ {
+ System.out.println(Arrays.asList(cargs));
+ System.out.println(_usage);
+ System.exit(1);
+ }
+
+ command = cargs[0];
+
+ if (cargs.length == 2)
+ {
+ arg = cargs[1];
+ if (!arg.startsWith("{") || !arg.endsWith("}"))
+ {
+ System.out.println("Incorrect format for args.");
+ System.out.println("This needs to be in a Stringified Map format similar to an Address String");
+ System.exit(1);
+ }
+ }
+
+ QpidCtrl qpidCtrl = new QpidCtrl(host, connectionOptions, pkg, cls, id, agentName, command, arg);
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(_usage);
+ System.out.println(e.getMessage());
+ System.exit(1);
+ }
+ }
+}
diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java
new file mode 100644
index 0000000000..34a38bf8d7
--- /dev/null
+++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java
@@ -0,0 +1,210 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfEvent;
+import org.apache.qpid.qmf2.common.QmfEventListener;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.common.WorkItem;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.EventReceivedWorkItem;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+import static org.apache.qpid.qmf2.common.WorkItem.WorkItemType.*;
+
+/**
+ * Collect and print events from one or more Qpid message brokers.
+ * <pre>
+ * If no broker-addr is supplied, QpidPrintEvents connects to 'localhost:5672'.
+ *
+ * [broker-addr] syntax:
+ *
+ * [username/password@] hostname
+ * ip-address [:&lt;port&gt;]
+ *
+ * Examples:
+ *
+ * $ QpidPrintEvents localhost:5672
+ * $ QpidPrintEvents 10.1.1.7:10000
+ * $ QpidPrintEvents guest/guest@broker-host:10000
+ *
+ * Options:
+ * -h, --help show this help message and exit
+ * --heartbeats Use heartbeats.
+ * --sasl-mechanism=&lt;mech&gt;
+ * SASL mechanism for authentication (e.g. EXTERNAL,
+ * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL
+ * automatically picks the most secure available
+ * mechanism - use this option to override.
+ * </pre>
+ * @author Fraser Adams
+ */
+public final class QpidPrintEvents implements QmfEventListener
+{
+ private static final String _usage =
+ "Usage: QpidPrintEvents [options] [broker-addr]...\n";
+
+ private static final String _description =
+ "Collect and print events from one or more Qpid message brokers.\n" +
+ "\n" +
+ "If no broker-addr is supplied, QpidPrintEvents connects to 'localhost:5672'.\n" +
+ "\n" +
+ "[broker-addr] syntax:\n" +
+ "\n" +
+ "[username/password@] hostname\n" +
+ "ip-address [:<port>]\n" +
+ "\n" +
+ "Examples:\n" +
+ "\n" +
+ "$ QpidPrintEvents localhost:5672\n" +
+ "$ QpidPrintEvents 10.1.1.7:10000\n" +
+ "$ QpidPrintEvents guest/guest@broker-host:10000\n";
+
+ private static final String _options =
+ "Options:\n" +
+ " -h, --help show this help message and exit\n" +
+ " --heartbeats Use heartbeats.\n" +
+ " --sasl-mechanism=<mech>\n" +
+ " SASL mechanism for authentication (e.g. EXTERNAL,\n" +
+ " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" +
+ " automatically picks the most secure available\n" +
+ " mechanism - use this option to override.\n";
+
+ private final String _url;
+ private Console _console;
+
+ /**
+ * Basic constructor. Creates JMS Session, Initialises Destinations, Producers &amp; Consumers and starts connection.
+ * @param url the connection URL.
+ * @param connectionOptions the options String to pass to ConnectionHelper.
+ */
+ public QpidPrintEvents(final String url, final String connectionOptions)
+ {
+ System.out.println("Connecting to " + url);
+ _url = url;
+ try
+ {
+ Connection connection = ConnectionHelper.createConnection(url, connectionOptions);
+ _console = new Console(this);
+ _console.addConnection(connection);
+ }
+ catch (QmfException qmfe)
+ {
+ System.err.println ("QmfException " + qmfe.getMessage() + " caught in QpidPrintEvents constructor");
+ }
+ }
+
+ /**
+ * Checks if the WorkItem is an EventReceivedWorkItem and if it is extracts and renders the QmfEvent.
+ * @param wi a QMF2 WorkItem object
+ */
+ public void onEvent(final WorkItem wi)
+ {
+ if (wi instanceof EventReceivedWorkItem)
+ {
+ EventReceivedWorkItem item = (EventReceivedWorkItem)wi;
+ QmfEvent event = item.getEvent();
+ System.out.println(event + " broker=" + _url);
+ }
+ }
+
+ /**
+ * Runs QpidPrintEvents.
+ * @param args the command line arguments.
+ */
+ public static void main(final String[] args)
+ {
+ String logLevel = System.getProperty("amqj.logging.level");
+ logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG.
+ System.setProperty("amqj.logging.level", logLevel);
+
+ String[] longOpts = {"help", "heartbeats", "sasl-mechanism="};
+ try
+ {
+ String connectionOptions = "{reconnect: true}";
+ GetOpt getopt = new GetOpt(args, "h", longOpts);
+ List<String[]> optList = getopt.getOptList();
+ String[] cargs = {};
+ cargs = getopt.getEncArgs().toArray(cargs);
+ for (String[] opt : optList)
+ {
+ if (opt[0].equals("-h") || opt[0].equals("--help"))
+ {
+ System.out.println(_usage);
+ System.out.println(_description);
+ System.out.println(_options);
+ System.exit(1);
+ }
+ else if (opt[0].equals("--heartbeats"))
+ {
+ // Ignore Java uses heartbeats by default
+ }
+ else if (opt[0].equals("--sasl-mechanism"))
+ {
+ connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}";
+ }
+ }
+
+ int nargs = cargs.length;
+ if (nargs == 0)
+ {
+ cargs = new String[] {"localhost"};
+ }
+
+ for (String url : cargs)
+ {
+ QpidPrintEvents eventPrinter = new QpidPrintEvents(url, connectionOptions);
+ }
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(_usage);
+ System.exit(1);
+ }
+
+ BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in));
+ try
+ { // Blocks here until return is pressed
+ System.out.println("Hit Return to exit");
+ String s = commandLine.readLine();
+ System.exit(0);
+ }
+ catch (IOException e)
+ {
+ System.out.println ("QpidPrintEvents main(): IOException: " + e.getMessage());
+ }
+ }
+}
diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java
new file mode 100644
index 0000000000..4ac434dc17
--- /dev/null
+++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java
@@ -0,0 +1,374 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfEvent;
+import org.apache.qpid.qmf2.common.QmfEventListener;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.common.QmfQuery;
+import org.apache.qpid.qmf2.common.QmfQueryTarget;
+import org.apache.qpid.qmf2.common.SchemaClassId;
+import org.apache.qpid.qmf2.common.WorkItem;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.AgentHeartbeatWorkItem;
+import org.apache.qpid.qmf2.console.AgentRestartedWorkItem;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+
+import org.apache.qpid.qmf2.console.SubscribeIndication;
+import org.apache.qpid.qmf2.console.SubscribeParams;
+import org.apache.qpid.qmf2.console.SubscriptionIndicationWorkItem;
+
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+
+/**
+ * Collect and print queue statistics.
+ * <pre>
+ * Usage: QpidQueueStats [options]
+ *
+ * Options:
+ * -h, --help show this help message and exit
+ * -a &lt;address&gt;, --broker-address=&lt;address&gt;
+ * broker-addr is in the form: [username/password@]
+ * hostname | ip-address [:&lt;port&gt;] ex: localhost,
+ * 10.1.1.7:10000, broker-host:10000,
+ * guest/guest@localhost
+ * -f &lt;filter&gt;, --filter=&lt;filter&gt;
+ * a list of comma separated queue names (regex are
+ * accepted) to show
+ * --sasl-mechanism=&lt;mech&gt;
+ * SASL mechanism for authentication (e.g. EXTERNAL,
+ * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL
+ * automatically picks the most secure available
+ * mechanism - use this option to override.
+ * </pre>
+ * @author Fraser Adams
+ */
+public final class QpidQueueStats implements QmfEventListener
+{
+ private final class Stats
+ {
+ private final String _name;
+ private QmfConsoleData _data;
+
+ public Stats(final String name, final QmfConsoleData data)
+ {
+ _name = name;
+ _data = data;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public QmfConsoleData getData()
+ {
+ return _data;
+ }
+
+ public void setData(final QmfConsoleData data)
+ {
+ _data = data;
+ }
+ }
+
+ private static final String _usage =
+ "Usage: QpidQueueStats [options]\n";
+
+ private static final String _options =
+ "Options:\n" +
+ " -h, --help show this help message and exit\n" +
+ " -a <address>, --broker-address=<address>\n" +
+ " broker-addr is in the form: [username/password@]\n" +
+ " hostname | ip-address [:<port>] ex: localhost,\n" +
+ " 10.1.1.7:10000, broker-host:10000,\n" +
+ " guest/guest@localhost\n" +
+ " -f <filter>, --filter=<filter>\n" +
+ " a list of comma separated queue names (regex are\n" +
+ " accepted) to show\n" +
+ " --sasl-mechanism=<mech>\n" +
+ " SASL mechanism for authentication (e.g. EXTERNAL,\n" +
+ " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" +
+ " automatically picks the most secure available\n" +
+ " mechanism - use this option to override.\n";
+
+ private final String _url;
+ private final List<Pattern> _filter;
+ private Agent _broker;
+ private Console _console;
+ private Map<ObjectId, Stats> _objects = new HashMap<ObjectId, Stats>();
+ private String _subscriptionId = null;
+ private long _subscriptionDuration;
+ private long _startTime;
+
+ /**
+ * Basic constructor. Creates JMS Session, Initialises Destinations, Producers &amp; Consumers and starts connection.
+ * @param url the connection URL.
+ * @param connectionOptions the options String to pass to ConnectionHelper.
+ * @param filter a list of regex Patterns used to choose the queues we wish to display.
+ */
+ public QpidQueueStats(final String url, final String connectionOptions, final List<Pattern> filter)
+ {
+ System.out.println("Connecting to " + url);
+ if (filter.size() > 0)
+ {
+ System.out.println("Filter = " + filter);
+ }
+ _url = url;
+ _filter = filter;
+ try
+ {
+ Connection connection = ConnectionHelper.createConnection(url, connectionOptions);
+ _console = new Console(this);
+ _console.addConnection(connection);
+
+ // Wait until the broker Agent has been discovered
+ _broker = _console.findAgent("broker");
+ if (_broker != null)
+ {
+ createQueueSubscription();
+ }
+
+ System.out.println("Hit Return to exit");
+ System.out.println(
+ "Queue Name Sec Depth Enq Rate Deq Rate");
+ System.out.println(
+ "=============================================================================================");
+ }
+ catch (QmfException qmfe)
+ {
+ System.err.println ("QmfException " + qmfe.getMessage() + " caught in QpidQueueStats constructor");
+ }
+ }
+
+ /**
+ * Create a Subscription to query for all queue objects
+ */
+ private void createQueueSubscription()
+ {
+ try
+ { // This QmfQuery simply does an ID query for objects with the className "queue"
+ QmfQuery query = new QmfQuery(QmfQueryTarget.OBJECT, new SchemaClassId("queue"));
+ SubscribeParams params = _console.createSubscription(_broker, query, "queueStatsHandle");
+ _subscriptionId = params.getSubscriptionId();
+ _subscriptionDuration = params.getLifetime() - 10; // Subtract 10 as we want to refresh before it times out
+ _startTime = System.currentTimeMillis();
+ }
+ catch (QmfException qmfe)
+ {
+ }
+ }
+
+ /**
+ * Main Event handler. Checks if the WorkItem is a SubscriptionIndicationWorkItem, if it is it stores the object
+ * in a Map and uses this to maintain state so we can record deltas such as enqueue and dequeue rates.
+ * <p>
+ * The AgentHeartbeatWorkItem is used to periodically compare the elapsed time against the Subscription duration
+ * so that we can refresh the Subscription (or create a new one if necessary) in order to continue receiving
+ * queue Management Object data from the broker.
+ * <p>
+ * When the AgentRestartedWorkItem is received we clear the state to remove any stale queue Management Objects.
+ * @param wi a QMF2 WorkItem object
+ */
+ public void onEvent(final WorkItem wi)
+ {
+ if (wi instanceof AgentHeartbeatWorkItem && _subscriptionId != null)
+ {
+ long elapsed = (long)Math.round((System.currentTimeMillis() - _startTime)/1000.0f);
+ if (elapsed > _subscriptionDuration)
+ {
+ try
+ {
+ _console.refreshSubscription(_subscriptionId);
+ _startTime = System.currentTimeMillis();
+ }
+ catch (QmfException qmfe)
+ {
+ System.err.println ("QmfException " + qmfe.getMessage() + " caught in QpidQueueStats onEvent");
+ createQueueSubscription();
+ }
+ }
+ }
+ else if (wi instanceof AgentRestartedWorkItem)
+ {
+ _objects.clear();
+ }
+ else if (wi instanceof SubscriptionIndicationWorkItem)
+ {
+ SubscriptionIndicationWorkItem item = (SubscriptionIndicationWorkItem)wi;
+ SubscribeIndication indication = item.getSubscribeIndication();
+ String correlationId = indication.getConsoleHandle();
+ if (correlationId.equals("queueStatsHandle"))
+ { // If it is (and it should be!!) then it's our queue object Subscription
+ List<QmfConsoleData> data = indication.getData();
+ for (QmfConsoleData record : data)
+ {
+ ObjectId id = record.getObjectId();
+ if (record.isDeleted())
+ { // If the object was deleted by the Agent we remove it from out Map
+ _objects.remove(id);
+ }
+ else
+ {
+ if (_objects.containsKey(id))
+ { // If the object is already in the Map it's likely to be a statistics push from the broker.
+ Stats stats = _objects.get(id);
+ String name = stats.getName();
+
+ boolean matches = false;
+ for (Pattern x : _filter)
+ { // Check the queue name against the regexes in the filter List (if any)
+ Matcher m = x.matcher(name);
+ if (m.find())
+ {
+ matches = true;
+ break;
+ }
+ }
+
+ if (_filter.isEmpty() || matches)
+ { // If there's no filter enabled or the filter matches the queue name we display statistics.
+ QmfConsoleData lastSample = stats.getData();
+ stats.setData(record);
+
+ float deltaTime = record.getUpdateTime() - lastSample.getUpdateTime();
+ if (deltaTime > 1000000000.0f)
+ {
+ float deltaEnqueues = record.getLongValue("msgTotalEnqueues") -
+ lastSample.getLongValue("msgTotalEnqueues");
+ float deltaDequeues = record.getLongValue("msgTotalDequeues") -
+ lastSample.getLongValue("msgTotalDequeues");
+ long msgDepth = record.getLongValue("msgDepth");
+ float enqueueRate = deltaEnqueues/(deltaTime/1000000000.0f);
+ float dequeueRate = deltaDequeues/(deltaTime/1000000000.0f);
+
+ System.out.printf("%-46s%10.2f%11d%13.2f%13.2f\n",
+ name, deltaTime/1000000000, msgDepth, enqueueRate, dequeueRate);
+ }
+ }
+ }
+ else
+ { // If the object isn't in the Map it's likely to be a properties push from the broker.
+ if (!record.hasValue("name"))
+ { // This probably won't happen, but if it does we refresh the object to get its full state.
+ try
+ {
+ record.refresh();
+ }
+ catch (QmfException qmfe)
+ {
+ }
+ }
+ String queueName = record.getStringValue("name");
+ _objects.put(id, new Stats(queueName, record));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Runs QpidQueueStats.
+ * @param args the command line arguments.
+ */
+ public static void main(final String[] args)
+ {
+ String logLevel = System.getProperty("amqj.logging.level");
+ logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG.
+ System.setProperty("amqj.logging.level", logLevel);
+
+ String[] longOpts = {"help", "broker-address=", "filter=", "sasl-mechanism="};
+ try
+ {
+ String host = "localhost";
+ String connectionOptions = "{reconnect: true}";
+ List<Pattern> filter = new ArrayList<Pattern>();
+ GetOpt getopt = new GetOpt(args, "ha:f:", longOpts);
+ List<String[]> optList = getopt.getOptList();
+
+ for (String[] opt : optList)
+ {
+ if (opt[0].equals("-h") || opt[0].equals("--help"))
+ {
+ System.out.println(_usage);
+ System.out.println(_options);
+ System.exit(1);
+ }
+ else if (opt[0].equals("-a") || opt[0].equals("--broker-address"))
+ {
+ host = opt[1];
+ }
+ else if (opt[0].equals("-f") || opt[0].equals("--filter"))
+ {
+ String[] split = opt[1].split(",");
+ for (String s : split)
+ {
+ Pattern p = Pattern.compile(s);
+ filter.add(p);
+ }
+ }
+ else if (opt[0].equals("--sasl-mechanism"))
+ {
+ connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}";
+ }
+ }
+
+ QpidQueueStats queueStats = new QpidQueueStats(host, connectionOptions, filter);
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(_usage);
+ System.out.println(e.getMessage());
+ System.exit(1);
+ }
+
+ BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in));
+ try
+ { // Blocks here until return is pressed
+ String s = commandLine.readLine();
+ System.exit(0);
+ }
+ catch (IOException e)
+ {
+ System.out.println ("QpidQueueStats main(): IOException: " + e.getMessage());
+ }
+ }
+}
diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java
new file mode 100644
index 0000000000..31368ed242
--- /dev/null
+++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java
@@ -0,0 +1,364 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfData;
+import org.apache.qpid.qmf2.common.QmfEvent;
+import org.apache.qpid.qmf2.common.QmfEventListener;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.common.SchemaClassId;
+import org.apache.qpid.qmf2.common.WorkItem;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.EventReceivedWorkItem;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+
+/**
+ * QueueFuse provides protection to message producers from consumers who can't consume messages fast enough.
+ * <p>
+ * With the default "reject" limit policy when a queue exceeds its capacity an exception is thrown to the
+ * producer. This behaviour is unfortunate, because if there happen to be multiple consumers consuming
+ * messages from a given producer it is possible for a single slow consumer to cause message flow to be
+ * stopped to <u>all</u> consumers, in other words a de-facto denial of service may take place.
+ * <p>
+ * In an Enterprise environment it is likely that this sort of behaviour is unwelcome, so QueueFuse makes it
+ * possible for queueThresholdExceeded Events to be detected and for the offending queues to have messages
+ * purged, thus protecting the other consumers by preventing an exception being thrown to the message producer.
+ * <p>
+ * The original intention with this class was to unbind bindings to queues that exceed the threshold. This method
+ * works, but it has a number of disadvantages. In particular there is no way to unbind from (and thus protect)
+ * queues bound to the default direct exchange, in addition in order to unbind it is necessary to retrieve
+ * binding and exchange information, both of which require further exchanges with the broker (which is not
+ * desirable as when the queueThresholdExceeded occurs we need to act pretty quickly). Finally as it happens
+ * it is also necessary to purge some messages after unbinding anyway as if this is not done the queue remains
+ * in the flowStopped state and producers will eventually time out and throw an exception if this is not cleared.
+ * So all in all simply purging each time we cross the threshold is simpler and has the additional advantage that
+ * if and when the consumer speeds up message delivery will eventually return to normal.
+ *
+ * <pre>
+ * Usage: QueueFuse [options] [broker-addr]...
+ *
+ * Monitors one or more Qpid message brokers for queueThresholdExceeded Events.
+ *
+ * If a queueThresholdExceeded Event occurs messages are purged from the queue,
+ * in other words this class behaves rather like a fuse 'blowing' if the
+ * threshold gets exceeded.
+ *
+ * If no broker-addr is supplied, QueueFuse connects to 'localhost:5672'.
+ *
+ * [broker-addr] syntax:
+ *
+ * [username/password@] hostname
+ * ip-address [:&lt;port&gt;]
+ *
+ * Examples:
+ *
+ * $ QueueFuse localhost:5672
+ * $ QueueFuse 10.1.1.7:10000
+ * $ QueueFuse guest/guest@broker-host:10000
+ *
+ * Options:
+ * -h, --help show this help message and exit
+ * -f &lt;filter&gt;, --filter=&lt;filter&gt;
+ * a list of comma separated queue names (regex are
+ * accepted) to protect (default is to protect all).
+ * -p &lt;PERCENT&gt;, --purge=&lt;PERCENT&gt;\n" +
+ * The percentage of messages to purge when the queue\n" +
+ * threshold gets exceeded (default = 20%).\n" +
+ * N.B. if this gets set too low the fuse may not blow.\n" +
+ * --sasl-mechanism=&lt;mech&gt;
+ * SASL mechanism for authentication (e.g. EXTERNAL,
+ * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL
+ * automatically picks the most secure available
+ * mechanism - use this option to override.
+ * </pre>
+ * @author Fraser Adams
+ */
+public final class QueueFuse implements QmfEventListener
+{
+ private static final String _usage =
+ "Usage: QueueFuse [options] [broker-addr]...\n";
+
+ private static final String _description =
+ "Monitors one or more Qpid message brokers for queueThresholdExceeded Events.\n" +
+ "\n" +
+ "If a queueThresholdExceeded Event occurs messages are purged from the queue,\n" +
+ "in other words this class behaves rather like a fuse 'blowing' if the\n" +
+ "threshold gets exceeded.\n" +
+ "\n" +
+ "If no broker-addr is supplied, QueueFuse connects to 'localhost:5672'.\n" +
+ "\n" +
+ "[broker-addr] syntax:\n" +
+ "\n" +
+ "[username/password@] hostname\n" +
+ "ip-address [:<port>]\n" +
+ "\n" +
+ "Examples:\n" +
+ "\n" +
+ "$ QueueFuse localhost:5672\n" +
+ "$ QueueFuse 10.1.1.7:10000\n" +
+ "$ QueueFuse guest/guest@broker-host:10000\n";
+
+ private static final String _options =
+ "Options:\n" +
+ " -h, --help show this help message and exit\n" +
+ " -f <filter>, --filter=<filter>\n" +
+ " a list of comma separated queue names (regex are\n" +
+ " accepted) to protect (default is to protect all).\n" +
+ " -p <PERCENT>, --purge=<PERCENT>\n" +
+ " The percentage of messages to purge when the queue\n" +
+ " threshold gets exceeded (default = 20%).\n" +
+ " N.B. if this gets set too low the fuse may not blow.\n" +
+ " --sasl-mechanism=<mech>\n" +
+ " SASL mechanism for authentication (e.g. EXTERNAL,\n" +
+ " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" +
+ " automatically picks the most secure available\n" +
+ " mechanism - use this option to override.\n";
+
+ private final String _url;
+ private final List<Pattern> _filter;
+ private final float _purge;
+ private Map<String, QmfConsoleData> _queueCache = new HashMap<String, QmfConsoleData>(50);
+ private Console _console;
+
+ /**
+ * Basic constructor. Creates JMS Session, Initialises Destinations, Producers &amp; Consumers and starts connection.
+ * @param url the connection URL.
+ * @param connectionOptions the options String to pass to ConnectionHelper.
+ * @param filter a list of regex Patterns used to choose the queues we wish to protect.
+ * @param purge the ratio of messages that we wish to purge if the threshold gets exceeded.
+ */
+ public QueueFuse(final String url, final String connectionOptions, final List<Pattern> filter, final float purge)
+ {
+ System.out.println("QueueFuse Connecting to " + url);
+ if (filter.size() > 0)
+ {
+ System.out.println("Filter = " + filter);
+ }
+ _url = url;
+ _filter = filter;
+ _purge = purge;
+ try
+ {
+ Connection connection = ConnectionHelper.createConnection(url, connectionOptions);
+ _console = new Console(this);
+ _console.addConnection(connection);
+ updateQueueCache();
+ }
+ catch (QmfException qmfe)
+ {
+ System.err.println("QmfException " + qmfe.getMessage() + " caught in QueueFuse constructor");
+ }
+ }
+
+ /**
+ * Looks up queue objects and stores them in _queueCache keyed by the queue name
+ */
+ private void updateQueueCache()
+ {
+ _queueCache.clear();
+ List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue");
+ for (QmfConsoleData queue : queues)
+ {
+ String queueName = queue.getStringValue("name");
+ _queueCache.put(queueName, queue);
+ }
+ }
+
+ /**
+ * Look up a queue object with the given name and if it's not a ring queue invoke the queue's purge method.
+ * @param queueName the name of the queue to purge
+ * @param msgDepth the number of messages on the queue, used to determine how many messages to purge.
+ */
+ private void purgeQueue(final String queueName, long msgDepth)
+ {
+ QmfConsoleData queue = _queueCache.get(queueName);
+
+ if (queue == null)
+ {
+ System.out.printf("%s ERROR QueueFuse.disconnectQueue() %s reference couldn't be found\n",
+ new Date().toString(), queueName);
+ }
+ else
+ { // If we've found a queue called queueName we then find the bindings that reference it.
+
+ Map args = (Map)queue.getValue("arguments");
+ String policyType = (String)args.get("qpid.policy_type");
+ if (policyType != null && policyType.equals("ring"))
+ { // If qpid.policy_type=ring we return.
+ return;
+ }
+
+ try
+ {
+ QmfData arguments = new QmfData();
+ arguments.setValue("request", (long)(_purge*msgDepth));
+ queue.invokeMethod("purge", arguments);
+ }
+ catch (QmfException e)
+ {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Main Event handler.
+ * @param wi a QMF2 WorkItem object
+ */
+ public void onEvent(final WorkItem wi)
+ {
+ if (wi instanceof EventReceivedWorkItem)
+ {
+ EventReceivedWorkItem item = (EventReceivedWorkItem)wi;
+ Agent agent = item.getAgent();
+ QmfEvent event = item.getEvent();
+ String className = event.getSchemaClassId().getClassName();
+
+ if (className.equals("queueDeclare"))
+ {
+ updateQueueCache();
+ }
+ else if (className.equals("queueThresholdExceeded"))
+ {
+ String queueName = event.getStringValue("qName");
+ boolean matches = false;
+ for (Pattern x : _filter)
+ { // Check the queue name against the regexes in the filter List (if any)
+ Matcher m = x.matcher(queueName);
+ if (m.find())
+ {
+ matches = true;
+ break;
+ }
+ }
+
+ if (_filter.isEmpty() || matches)
+ { // If there's no filter enabled or the filter matches the queue name we call purgeQueue().
+ long msgDepth = event.getLongValue("msgDepth");
+ purgeQueue(queueName, msgDepth);
+ }
+ }
+ }
+ }
+
+ /**
+ * Runs QueueFuse.
+ * @param args the command line arguments.
+ */
+ public static void main(final String[] args)
+ {
+ String logLevel = System.getProperty("amqj.logging.level");
+ logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG.
+ System.setProperty("amqj.logging.level", logLevel);
+
+ String[] longOpts = {"help", "filter=", "purge=", "sasl-mechanism="};
+ try
+ {
+ boolean includeRingQueues = false;
+ String connectionOptions = "{reconnect: true}";
+ List<Pattern> filter = new ArrayList<Pattern>();
+ float purge = 0.2f;
+ GetOpt getopt = new GetOpt(args, "hf:p:", longOpts);
+ List<String[]> optList = getopt.getOptList();
+ String[] cargs = {};
+ cargs = getopt.getEncArgs().toArray(cargs);
+ for (String[] opt : optList)
+ {
+ if (opt[0].equals("-h") || opt[0].equals("--help"))
+ {
+ System.out.println(_usage);
+ System.out.println(_description);
+ System.out.println(_options);
+ System.exit(1);
+ }
+ else if (opt[0].equals("-f") || opt[0].equals("--filter"))
+ {
+ String[] split = opt[1].split(",");
+ for (String s : split)
+ {
+ Pattern p = Pattern.compile(s);
+ filter.add(p);
+ }
+ }
+ else if (opt[0].equals("-p") || opt[0].equals("--purge"))
+ {
+ int percent = Integer.parseInt(opt[1]);
+ if (percent < 0 || percent > 100)
+ {
+ System.out.println(_usage);
+ System.exit(1);
+ }
+ purge = percent/100.0f;
+ }
+ else if (opt[0].equals("--sasl-mechanism"))
+ {
+ connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}";
+ }
+ }
+
+ int nargs = cargs.length;
+ if (nargs == 0)
+ {
+ cargs = new String[] {"localhost"};
+ }
+
+ for (String url : cargs)
+ {
+ QueueFuse queueFuse = new QueueFuse(url, connectionOptions, filter, purge);
+ }
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(_usage);
+ System.out.println(e.getMessage());
+ System.exit(1);
+ }
+
+ try
+ { // Block here
+ Thread.currentThread().join();
+ }
+ catch (InterruptedException ie)
+ {
+ }
+ }
+}