diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java | 652 |
1 files changed, 652 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java new file mode 100644 index 0000000000..dca165fa7e --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java @@ -0,0 +1,652 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.configuration.Configuration; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.tools.messagestore.commands.Clear; +import org.apache.qpid.tools.messagestore.commands.Command; +import org.apache.qpid.tools.messagestore.commands.Copy; +import org.apache.qpid.tools.messagestore.commands.Dump; +import org.apache.qpid.tools.messagestore.commands.Help; +import org.apache.qpid.tools.messagestore.commands.List; +import org.apache.qpid.tools.messagestore.commands.Load; +import org.apache.qpid.tools.messagestore.commands.Quit; +import org.apache.qpid.tools.messagestore.commands.Select; +import org.apache.qpid.tools.messagestore.commands.Show; +import org.apache.qpid.tools.messagestore.commands.Move; +import org.apache.qpid.tools.messagestore.commands.Purge; +import org.apache.qpid.tools.utils.CommandParser; +import org.apache.qpid.tools.utils.Console; +import org.apache.qpid.tools.utils.SimpleCommandParser; +import org.apache.qpid.tools.utils.SimpleConsole; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.StringTokenizer; + +/** + * MessageStoreTool. + */ +public class MessageStoreTool +{ + /** Text outputted at the start of each console.*/ + private static final String BOILER_PLATE = "MessageStoreTool - for examining Persistent Qpid Broker MessageStore instances"; + + /** I/O Wrapper. */ + protected Console _console; + + /** Batch mode flag. */ + protected boolean _batchMode; + + /** Internal State object. */ + private State _state = new State(); + + private HashMap<String, Command> _commands = new HashMap<String, Command>(); + + /** SLF4J Logger. */ + private static Logger _devlog = LoggerFactory.getLogger(MessageStoreTool.class); + + /** Loaded configuration file. */ + private Configuration _config; + + /** Control used for main run loop. */ + private boolean _running = true; + private boolean _initialised = false; + + //---------------------------------------------------------------------------------------------------/ + + public static void main(String[] args) throws Configuration.InitException + { + + MessageStoreTool tool = new MessageStoreTool(args); + + tool.start(); + } + + + public MessageStoreTool(String[] args) throws Configuration.InitException + { + this(args, System.in, System.out); + } + + public MessageStoreTool(String[] args, InputStream in, OutputStream out) throws Configuration.InitException + { + BufferedReader consoleReader = new BufferedReader(new InputStreamReader(in)); + BufferedWriter consoleWriter = new BufferedWriter(new OutputStreamWriter(out)); + + Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(this))); + _batchMode = false; + + _console = new SimpleConsole(consoleWriter, consoleReader); + + _config = new Configuration(); + + setOptions(); + _config.processCommandline(args); + } + + + private void setOptions() + { + Option help = new Option("h", "help", false, "print this message"); + Option version = new Option("v", "version", false, "print the version information and exit"); + Option configFile = + OptionBuilder.withArgName("file").hasArg() + .withDescription("use given configuration file By " + + "default looks for a file named " + + Configuration.DEFAULT_CONFIG_FILE + " in " + Configuration.QPID_HOME) + .withLongOpt("config") + .create("c"); + + _config.setOption(help); + _config.setOption(version); + _config.setOption(configFile); + } + + public State getState() + { + return _state; + } + + public Map<String, Command> getCommands() + { + return _commands; + } + + public void setConfigurationFile(String configfile) throws Configuration.InitException + { + _config.loadConfig(new File(configfile)); + setup(); + } + + public Console getConsole() + { + return _console; + } + + public void setConsole(Console console) + { + _console = console; + } + + /** + * Simple ShutdownHook to cleanly shutdown the databases + */ + static class ShutdownHook implements Runnable + { + MessageStoreTool _tool; + + ShutdownHook(MessageStoreTool messageStoreTool) + { + _tool = messageStoreTool; + } + + public void run() + { + _tool.quit(); + } + } + + public void quit() + { + _running = false; + + if (_initialised) + { + ApplicationRegistry.remove(1); + } + + _console.println("...exiting"); + + _console.close(); + } + + public void setBatchMode(boolean batchmode) + { + _batchMode = batchmode; + } + + /** + * Main loop + */ + protected void start() + { + setup(); + + if (!_initialised) + { + System.exit(1); + } + + _console.println(""); + + _console.println(BOILER_PLATE); + + runCLI(); + } + + private void setup() + { + loadDefaultVirtualHosts(); + + loadCommands(); + + _state.clearAll(); + } + + private void loadCommands() + { + _commands.clear(); + //todo Dynamically load the classes that exis in com.redhat.etp.qpid.commands + _commands.put("close", new Clear(this)); + _commands.put("copy", new Copy(this)); + _commands.put("dump", new Dump(this)); + _commands.put("help", new Help(this)); + _commands.put("list", new List(this)); + _commands.put("load", new Load(this)); + _commands.put("move", new Move(this)); + _commands.put("purge", new Purge(this)); + _commands.put("quit", new Quit(this)); + _commands.put("select", new Select(this)); + _commands.put("show", new Show(this)); + } + + private void loadDefaultVirtualHosts() + { + final File configFile = _config.getConfigFile(); + + loadVirtualHosts(configFile); + } + + private void loadVirtualHosts(File configFile) + { + + if (!configFile.exists()) + { + _devlog.error("Config file not found:" + configFile.getAbsolutePath()); + return; + } + else + { + _devlog.debug("using config file :" + configFile.getAbsolutePath()); + } + + try + { + ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(configFile); + + ApplicationRegistry.remove(1); + + ApplicationRegistry.initialise(registry); + + checkMessageStores(); + _initialised = true; + } + catch (ConfigurationException e) + { + _console.println("Unable to load configuration due to configuration error: " + e.getMessage()); + e.printStackTrace(); + } + catch (Exception e) + { + _console.println("Unable to load configuration due to: " + e.getMessage()); + e.printStackTrace(); + } + + + } + + private void checkMessageStores() + { + Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts(); + + boolean warning = false; + for (VirtualHost vhost : vhosts) + { + if (vhost.getMessageStore() instanceof MemoryMessageStore) + { + _console.println("WARNING: Virtualhost '" + vhost.getName() + "' is using a MemoryMessageStore. " + + "Changes will not persist."); + warning = true; + } + } + + if (warning) + { + _console.println(""); + _console.println("Please ensure you are using the correct config file currently using '" + + _config.getConfigFile().getAbsolutePath() + "'"); + _console.println("New config file can be specifed by 'load <config file>' or -c on the commandline."); + _console.println(""); + } + } + + private void runCLI() + { + while (_running) + { + if (!_batchMode) + { + printPrompt(); + } + + String[] args = _console.readCommand(); + + while (args != null) + { + exec(args); + + if (_running) + { + if (!_batchMode) + { + printPrompt(); + } + + args = _console.readCommand(); + } + } + } + } + + private void printPrompt() + { + _console.print(prompt()); + } + + + /** + * Execute a script (batch mode). + * + * @param script The file script + */ + protected void runScripts(String script) + { + //Store Current State + boolean oldBatch = _batchMode; + CommandParser oldParser = _console.getCommandParser(); + setBatchMode(true); + + try + { + _devlog.debug("Running script '" + script + "'"); + + _console.setCommandParser(new SimpleCommandParser(new BufferedReader(new FileReader(script)))); + + start(); + } + catch (java.io.FileNotFoundException e) + { + _devlog.error("Script not found: '" + script + "' due to:" + e.getMessage()); + } + + //Restore previous state + _console.setCommandParser(oldParser); + setBatchMode(oldBatch); + } + + public String prompt() + { + String state = _state.toString(); + if (state != null && state.length() != 0) + { + return state + ":bdb$ "; + } + else + { + return "bdb$ "; + } + } + + /** + * Execute the command. + * + * @param args [command, arg0, arg1...]. + */ + protected void exec(String[] args) + { + // Comment lines start with a # + if (args.length == 0 || args[0].startsWith("#")) + { + return; + } + + final String command = args[0]; + + Command cmd = _commands.get(command); + + if (cmd == null) + { + _console.println("Command not understood: " + command); + } + else + { + cmd.execute(args); + } + } + + + /** + * Displays usage info. + */ + protected static void help() + { + System.out.println(BOILER_PLATE); + System.out.println("Usage: java " + MessageStoreTool.class + " [Options]"); + System.out.println(" [-c <broker config file>] : Defaults to \"$QPID_HOME/etc/config.xml\""); + } + + + /** + * This class is used to store the current state of the tool. + * + * This is then interrogated by the various commands to augment their behaviour. + * + * + */ + public static class State + { + private VirtualHost _vhost = null; + private AMQQueue _queue = null; + private Exchange _exchange = null; + private java.util.List<Long> _msgids = null; + + public State() + { + } + + public void setQueue(AMQQueue queue) + { + _queue = queue; + } + + public AMQQueue getQueue() + { + return _queue; + } + + public void setVhost(VirtualHost vhost) + { + _vhost = vhost; + } + + public VirtualHost getVhost() + { + return _vhost; + } + + public Exchange getExchange() + { + return _exchange; + } + + public void setExchange(Exchange exchange) + { + _exchange = exchange; + } + + public String toString() + { + StringBuilder status = new StringBuilder(); + + if (_vhost != null) + { + status.append(_vhost.getName()); + + if (_exchange != null) + { + status.append("["); + status.append(_exchange.getNameShortString()); + status.append("]"); + + if (_queue != null) + { + status.append("->'"); + status.append(_queue.getNameShortString()); + status.append("'"); + + if (_msgids != null) + { + status.append(printMessages()); + } + } + } + } + + return status.toString(); + } + + + public String printMessages() + { + StringBuilder sb = new StringBuilder(); + + Long previous = null; + + Long start = null; + for (Long id : _msgids) + { + if (previous != null) + { + if (id == previous + 1) + { + if (start == null) + { + start = previous; + } + } + else + { + if (start != null) + { + sb.append(","); + sb.append(start); + sb.append("-"); + sb.append(id); + start = null; + } + else + { + sb.append(","); + sb.append(previous); + } + } + } + + previous = id; + } + + if (start != null) + { + sb.append(","); + sb.append(start); + sb.append("-"); + sb.append(_msgids.get(_msgids.size() - 1)); + } + else + { + sb.append(","); + sb.append(previous); + } + + // surround list in () + sb.replace(0, 1, "("); + sb.append(")"); + return sb.toString(); + } + + public void clearAll() + { + _vhost = null; + clearExchange(); + } + + public void clearExchange() + { + _exchange = null; + clearQueue(); + } + + public void clearQueue() + { + _queue = null; + clearMessages(); + } + + public void clearMessages() + { + _msgids = null; + } + + /** + * A common location to provide parsing of the message id string + * utilised by a number of the commands. + * The String is comma separated list of ids that can be individual ids + * or a range (4-10) + * + * @param msgString string of msg ids to parse 1,2,4-10 + */ + public void setMessages(String msgString) + { + StringTokenizer tok = new StringTokenizer(msgString, ","); + + if (tok.hasMoreTokens()) + { + _msgids = new LinkedList<Long>(); + } + + while (tok.hasMoreTokens()) + { + String next = tok.nextToken(); + if (next.contains("-")) + { + Long start = Long.parseLong(next.substring(0, next.indexOf("-"))); + Long end = Long.parseLong(next.substring(next.indexOf("-") + 1)); + + if (end >= start) + { + for (long l = start; l <= end; l++) + { + _msgids.add(l); + } + } + } + else + { + _msgids.add(Long.parseLong(next)); + } + } + + } + + public void setMessages(java.util.List<Long> msgids) + { + _msgids = msgids; + } + + public java.util.List<Long> getMessages() + { + return _msgids; + } + }//Class State + +}//Class MessageStoreTool |