diff options
Diffstat (limited to 'qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java')
-rw-r--r-- | qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java | 364 |
1 files changed, 364 insertions, 0 deletions
diff --git a/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java new file mode 100644 index 0000000000..31368ed242 --- /dev/null +++ b/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java @@ -0,0 +1,364 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf2.tools; + +// JMS Imports +import javax.jms.Connection; + +// Misc Imports +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +// QMF2 Imports +import org.apache.qpid.qmf2.common.ObjectId; +import org.apache.qpid.qmf2.common.QmfData; +import org.apache.qpid.qmf2.common.QmfEvent; +import org.apache.qpid.qmf2.common.QmfEventListener; +import org.apache.qpid.qmf2.common.QmfException; +import org.apache.qpid.qmf2.common.SchemaClassId; +import org.apache.qpid.qmf2.common.WorkItem; +import org.apache.qpid.qmf2.console.Agent; +import org.apache.qpid.qmf2.console.Console; +import org.apache.qpid.qmf2.console.EventReceivedWorkItem; +import org.apache.qpid.qmf2.console.QmfConsoleData; + +import org.apache.qpid.qmf2.util.ConnectionHelper; +import org.apache.qpid.qmf2.util.GetOpt; + +/** + * QueueFuse provides protection to message producers from consumers who can't consume messages fast enough. + * <p> + * With the default "reject" limit policy when a queue exceeds its capacity an exception is thrown to the + * producer. This behaviour is unfortunate, because if there happen to be multiple consumers consuming + * messages from a given producer it is possible for a single slow consumer to cause message flow to be + * stopped to <u>all</u> consumers, in other words a de-facto denial of service may take place. + * <p> + * In an Enterprise environment it is likely that this sort of behaviour is unwelcome, so QueueFuse makes it + * possible for queueThresholdExceeded Events to be detected and for the offending queues to have messages + * purged, thus protecting the other consumers by preventing an exception being thrown to the message producer. + * <p> + * The original intention with this class was to unbind bindings to queues that exceed the threshold. This method + * works, but it has a number of disadvantages. In particular there is no way to unbind from (and thus protect) + * queues bound to the default direct exchange, in addition in order to unbind it is necessary to retrieve + * binding and exchange information, both of which require further exchanges with the broker (which is not + * desirable as when the queueThresholdExceeded occurs we need to act pretty quickly). Finally as it happens + * it is also necessary to purge some messages after unbinding anyway as if this is not done the queue remains + * in the flowStopped state and producers will eventually time out and throw an exception if this is not cleared. + * So all in all simply purging each time we cross the threshold is simpler and has the additional advantage that + * if and when the consumer speeds up message delivery will eventually return to normal. + * + * <pre> + * Usage: QueueFuse [options] [broker-addr]... + * + * Monitors one or more Qpid message brokers for queueThresholdExceeded Events. + * + * If a queueThresholdExceeded Event occurs messages are purged from the queue, + * in other words this class behaves rather like a fuse 'blowing' if the + * threshold gets exceeded. + * + * If no broker-addr is supplied, QueueFuse connects to 'localhost:5672'. + * + * [broker-addr] syntax: + * + * [username/password@] hostname + * ip-address [:<port>] + * + * Examples: + * + * $ QueueFuse localhost:5672 + * $ QueueFuse 10.1.1.7:10000 + * $ QueueFuse guest/guest@broker-host:10000 + * + * Options: + * -h, --help show this help message and exit + * -f <filter>, --filter=<filter> + * a list of comma separated queue names (regex are + * accepted) to protect (default is to protect all). + * -p <PERCENT>, --purge=<PERCENT>\n" + + * The percentage of messages to purge when the queue\n" + + * threshold gets exceeded (default = 20%).\n" + + * N.B. if this gets set too low the fuse may not blow.\n" + + * --sasl-mechanism=<mech> + * SASL mechanism for authentication (e.g. EXTERNAL, + * ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL + * automatically picks the most secure available + * mechanism - use this option to override. + * </pre> + * @author Fraser Adams + */ +public final class QueueFuse implements QmfEventListener +{ + private static final String _usage = + "Usage: QueueFuse [options] [broker-addr]...\n"; + + private static final String _description = + "Monitors one or more Qpid message brokers for queueThresholdExceeded Events.\n" + + "\n" + + "If a queueThresholdExceeded Event occurs messages are purged from the queue,\n" + + "in other words this class behaves rather like a fuse 'blowing' if the\n" + + "threshold gets exceeded.\n" + + "\n" + + "If no broker-addr is supplied, QueueFuse connects to 'localhost:5672'.\n" + + "\n" + + "[broker-addr] syntax:\n" + + "\n" + + "[username/password@] hostname\n" + + "ip-address [:<port>]\n" + + "\n" + + "Examples:\n" + + "\n" + + "$ QueueFuse localhost:5672\n" + + "$ QueueFuse 10.1.1.7:10000\n" + + "$ QueueFuse guest/guest@broker-host:10000\n"; + + private static final String _options = + "Options:\n" + + " -h, --help show this help message and exit\n" + + " -f <filter>, --filter=<filter>\n" + + " a list of comma separated queue names (regex are\n" + + " accepted) to protect (default is to protect all).\n" + + " -p <PERCENT>, --purge=<PERCENT>\n" + + " The percentage of messages to purge when the queue\n" + + " threshold gets exceeded (default = 20%).\n" + + " N.B. if this gets set too low the fuse may not blow.\n" + + " --sasl-mechanism=<mech>\n" + + " SASL mechanism for authentication (e.g. EXTERNAL,\n" + + " ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n" + + " automatically picks the most secure available\n" + + " mechanism - use this option to override.\n"; + + private final String _url; + private final List<Pattern> _filter; + private final float _purge; + private Map<String, QmfConsoleData> _queueCache = new HashMap<String, QmfConsoleData>(50); + private Console _console; + + /** + * Basic constructor. Creates JMS Session, Initialises Destinations, Producers & Consumers and starts connection. + * @param url the connection URL. + * @param connectionOptions the options String to pass to ConnectionHelper. + * @param filter a list of regex Patterns used to choose the queues we wish to protect. + * @param purge the ratio of messages that we wish to purge if the threshold gets exceeded. + */ + public QueueFuse(final String url, final String connectionOptions, final List<Pattern> filter, final float purge) + { + System.out.println("QueueFuse Connecting to " + url); + if (filter.size() > 0) + { + System.out.println("Filter = " + filter); + } + _url = url; + _filter = filter; + _purge = purge; + try + { + Connection connection = ConnectionHelper.createConnection(url, connectionOptions); + _console = new Console(this); + _console.addConnection(connection); + updateQueueCache(); + } + catch (QmfException qmfe) + { + System.err.println("QmfException " + qmfe.getMessage() + " caught in QueueFuse constructor"); + } + } + + /** + * Looks up queue objects and stores them in _queueCache keyed by the queue name + */ + private void updateQueueCache() + { + _queueCache.clear(); + List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue"); + for (QmfConsoleData queue : queues) + { + String queueName = queue.getStringValue("name"); + _queueCache.put(queueName, queue); + } + } + + /** + * Look up a queue object with the given name and if it's not a ring queue invoke the queue's purge method. + * @param queueName the name of the queue to purge + * @param msgDepth the number of messages on the queue, used to determine how many messages to purge. + */ + private void purgeQueue(final String queueName, long msgDepth) + { + QmfConsoleData queue = _queueCache.get(queueName); + + if (queue == null) + { + System.out.printf("%s ERROR QueueFuse.disconnectQueue() %s reference couldn't be found\n", + new Date().toString(), queueName); + } + else + { // If we've found a queue called queueName we then find the bindings that reference it. + + Map args = (Map)queue.getValue("arguments"); + String policyType = (String)args.get("qpid.policy_type"); + if (policyType != null && policyType.equals("ring")) + { // If qpid.policy_type=ring we return. + return; + } + + try + { + QmfData arguments = new QmfData(); + arguments.setValue("request", (long)(_purge*msgDepth)); + queue.invokeMethod("purge", arguments); + } + catch (QmfException e) + { + System.out.println(e.getMessage()); + } + } + } + + /** + * Main Event handler. + * @param wi a QMF2 WorkItem object + */ + public void onEvent(final WorkItem wi) + { + if (wi instanceof EventReceivedWorkItem) + { + EventReceivedWorkItem item = (EventReceivedWorkItem)wi; + Agent agent = item.getAgent(); + QmfEvent event = item.getEvent(); + String className = event.getSchemaClassId().getClassName(); + + if (className.equals("queueDeclare")) + { + updateQueueCache(); + } + else if (className.equals("queueThresholdExceeded")) + { + String queueName = event.getStringValue("qName"); + boolean matches = false; + for (Pattern x : _filter) + { // Check the queue name against the regexes in the filter List (if any) + Matcher m = x.matcher(queueName); + if (m.find()) + { + matches = true; + break; + } + } + + if (_filter.isEmpty() || matches) + { // If there's no filter enabled or the filter matches the queue name we call purgeQueue(). + long msgDepth = event.getLongValue("msgDepth"); + purgeQueue(queueName, msgDepth); + } + } + } + } + + /** + * Runs QueueFuse. + * @param args the command line arguments. + */ + public static void main(final String[] args) + { + String logLevel = System.getProperty("amqj.logging.level"); + logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log level to FATAL rather than DEBUG. + System.setProperty("amqj.logging.level", logLevel); + + String[] longOpts = {"help", "filter=", "purge=", "sasl-mechanism="}; + try + { + boolean includeRingQueues = false; + String connectionOptions = "{reconnect: true}"; + List<Pattern> filter = new ArrayList<Pattern>(); + float purge = 0.2f; + GetOpt getopt = new GetOpt(args, "hf:p:", longOpts); + List<String[]> optList = getopt.getOptList(); + String[] cargs = {}; + cargs = getopt.getEncArgs().toArray(cargs); + for (String[] opt : optList) + { + if (opt[0].equals("-h") || opt[0].equals("--help")) + { + System.out.println(_usage); + System.out.println(_description); + System.out.println(_options); + System.exit(1); + } + else if (opt[0].equals("-f") || opt[0].equals("--filter")) + { + String[] split = opt[1].split(","); + for (String s : split) + { + Pattern p = Pattern.compile(s); + filter.add(p); + } + } + else if (opt[0].equals("-p") || opt[0].equals("--purge")) + { + int percent = Integer.parseInt(opt[1]); + if (percent < 0 || percent > 100) + { + System.out.println(_usage); + System.exit(1); + } + purge = percent/100.0f; + } + else if (opt[0].equals("--sasl-mechanism")) + { + connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}"; + } + } + + int nargs = cargs.length; + if (nargs == 0) + { + cargs = new String[] {"localhost"}; + } + + for (String url : cargs) + { + QueueFuse queueFuse = new QueueFuse(url, connectionOptions, filter, purge); + } + } + catch (IllegalArgumentException e) + { + System.out.println(_usage); + System.out.println(e.getMessage()); + System.exit(1); + } + + try + { // Block here + Thread.currentThread().join(); + } + catch (InterruptedException ie) + { + } + } +} |