summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java66
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java85
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java36
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java59
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java305
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java98
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java314
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java94
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java202
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java67
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java233
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java516
13 files changed, 2129 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java
new file mode 100644
index 0000000000..5444197cb4
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+public abstract class AbstractCommand implements Command
+{
+ protected Console _console;
+ protected MessageStoreTool _tool;
+
+ public AbstractCommand(MessageStoreTool tool)
+ {
+ _console = tool.getConsole();
+ _tool = tool;
+ }
+
+ public void setOutput(Console out)
+ {
+ _console = out;
+ }
+
+ protected void commandError(String message, String[] args)
+ {
+ _console.print(getCommand() + " : " + message);
+
+ if (args != null)
+ {
+ for (int i = 1; i < args.length; i++)
+ {
+ _console.print(args[i]);
+ }
+ }
+ _console.println("");
+ _console.println(help());
+ }
+
+
+ public abstract String help();
+
+ public abstract String usage();
+
+ public abstract String getCommand();
+
+
+ public abstract void execute(String... args);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java
new file mode 100644
index 0000000000..b0006b3fe6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+public class Clear extends AbstractCommand
+{
+ public Clear(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Clears any selection.";
+ }
+
+ public String usage()
+ {
+ return "clear [ all | virtualhost | exchange | queue | msgs ]";
+ }
+
+ public String getCommand()
+ {
+ return "clear";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length < 1)
+ {
+ doClose("all");
+ }
+ else
+ {
+ doClose(args[1]);
+ }
+ }
+
+ private void doClose(String type)
+ {
+ if (type.equals("virtualhost")
+ || type.equals("all"))
+ {
+ _tool.getState().clearAll();
+ }
+
+ if (type.equals("exchange"))
+ {
+ _tool.getState().clearExchange();
+ }
+
+ if (type.equals("queue"))
+ {
+ _tool.getState().clearQueue();
+ }
+
+ if (type.equals("msgs"))
+ {
+ _tool.getState().clearMessages();
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java
new file mode 100644
index 0000000000..bfa775a34a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.utils.Console;
+
+public interface Command
+{
+ public void setOutput(Console out);
+
+ public String help();
+
+ public abstract String usage();
+
+ String getCommand();
+
+ public void execute(String... args);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
new file mode 100644
index 0000000000..348c95572d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
+
+public class Copy extends Move
+{
+ public Copy(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Copy messages between queues.";/*\n" +
+ "The currently selected message set will be copied to the specifed queue.\n" +
+ "Alternatively the values can be provided on the command line."; */
+ }
+
+ public String usage()
+ {
+ return "copy to=<queue> [from=<queue>] [msgids=<msgids eg, 1,2,4-10>]";
+ }
+
+ public String getCommand()
+ {
+ return "copy";
+ }
+
+ protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue)
+ {
+ ServerTransaction txn = new LocalTransaction(fromQueue.getVirtualHost().getTransactionLog());
+ fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getNameShortString().toString(), txn);
+ txn.commit();
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
new file mode 100644
index 0000000000..8bb5d02b01
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.qpid.server.queue.QueueEntryImpl;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+import java.io.UnsupportedEncodingException;
+import java.util.LinkedList;
+import java.util.List;
+
+public class Dump extends Show
+{
+ private static final int LINE_SIZE = 8;
+ private static final String DEFAULT_ENCODING = "utf-8";
+ private static final boolean SPACE_BYTES = true;
+ private static final String BYTE_SPACER = " ";
+ private static final String NON_PRINTING_ASCII_CHAR = "?";
+
+ protected boolean _content = true;
+
+ public Dump(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Dump selected message content. Default: show=content";
+ }
+
+ public String usage()
+ {
+ return getCommand() + " [show=[all],[msgheaders],[_amqHeaders],[routing],[content]] [id=<msgid e.g. 1,2,4-10>]";
+ }
+
+ public String getCommand()
+ {
+ return "dump";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+
+ if (args.length >= 2)
+ {
+ for (String arg : args)
+ {
+ if (arg.startsWith("show="))
+ {
+ _content = arg.contains("content") || arg.contains("all");
+ }
+ }
+
+ parseArgs(args);
+ }
+
+ performShow();
+ }
+
+
+ protected List<List> createMessageData(java.util.List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting,
+ boolean showMessageHeaders)
+ {
+
+ List<List> display = new LinkedList<List>();
+
+ List<String> hex = new LinkedList<String>();
+ List<String> ascii = new LinkedList<String>();
+ display.add(hex);
+ display.add(ascii);
+
+ for (QueueEntry entry : messages)
+ {
+ ServerMessage msg = entry.getMessage();
+ if (!includeMsg(msg, msgids))
+ {
+ continue;
+ }
+
+ //Add divider between messages
+ hex.add(Console.ROW_DIVIDER);
+ ascii.add(Console.ROW_DIVIDER);
+
+ // Show general message information
+ hex.add(Show.Columns.ID.name());
+ ascii.add(msg.getMessageNumber().toString());
+
+ hex.add(Console.ROW_DIVIDER);
+ ascii.add(Console.ROW_DIVIDER);
+
+ if (showRouting)
+ {
+ addShowInformation(hex, ascii, msg, "Routing Details", true, false, false);
+ }
+ if (showHeaders)
+ {
+ addShowInformation(hex, ascii, msg, "Headers", false, true, false);
+ }
+ if (showMessageHeaders)
+ {
+ addShowInformation(hex, ascii, msg, null, false, false, true);
+ }
+
+ // Add Content Body section
+ hex.add("Content Body");
+ ascii.add("");
+ hex.add(Console.ROW_DIVIDER);
+ ascii.add(Console.ROW_DIVIDER);
+
+
+ final int messageSize = (int) msg.getSize();
+ if (messageSize != 0)
+ {
+ hex.add("Hex");
+ hex.add(Console.ROW_DIVIDER);
+
+
+ ascii.add("ASCII");
+ ascii.add(Console.ROW_DIVIDER);
+
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(64 * 1024);
+
+ int position = 0;
+
+ while(position < messageSize)
+ {
+
+ position += msg.getContent(buf, position);
+ buf.flip();
+ //Duplicate so we don't destroy original data :)
+ java.nio.ByteBuffer hexBuffer = buf;
+
+ java.nio.ByteBuffer charBuffer = hexBuffer.duplicate();
+
+ Hex hexencoder = new Hex();
+
+ while (hexBuffer.hasRemaining())
+ {
+ byte[] line = new byte[LINE_SIZE];
+
+ int bufsize = hexBuffer.remaining();
+ if (bufsize < LINE_SIZE)
+ {
+ hexBuffer.get(line, 0, bufsize);
+ }
+ else
+ {
+ bufsize = line.length;
+ hexBuffer.get(line);
+ }
+
+ byte[] encoded = hexencoder.encode(line);
+
+ try
+ {
+ String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING);
+ String hexLine = "";
+
+ int strLength = encStr.length();
+ for (int c = 0; c < strLength; c++)
+ {
+ hexLine += encStr.charAt(c);
+
+ if ((c & 1) == 1 && SPACE_BYTES)
+ {
+ hexLine += BYTE_SPACER;
+ }
+ }
+
+ hex.add(hexLine);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ _console.println(e.getMessage());
+ return null;
+ }
+ }
+
+ while (charBuffer.hasRemaining())
+ {
+ String asciiLine = "";
+
+ for (int pos = 0; pos < LINE_SIZE; pos++)
+ {
+ if (charBuffer.hasRemaining())
+ {
+ byte ch = charBuffer.get();
+
+ if (isPrintable(ch))
+ {
+ asciiLine += (char) ch;
+ }
+ else
+ {
+ asciiLine += NON_PRINTING_ASCII_CHAR;
+ }
+
+ if (SPACE_BYTES)
+ {
+ asciiLine += BYTE_SPACER;
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ ascii.add(asciiLine);
+ }
+ buf.clear();
+ }
+ }
+ else
+ {
+ List<String> result = new LinkedList<String>();
+
+ display.add(result);
+ result.add("No ContentBodies");
+ }
+ }
+
+ // if hex is empty then we have no data to display
+ if (hex.size() == 0)
+ {
+ return null;
+ }
+
+ return display;
+ }
+
+ private void addShowInformation(List<String> column1, List<String> column2, ServerMessage msg,
+ String title, boolean routing, boolean headers, boolean messageHeaders)
+ {
+ List<QueueEntry> single = new LinkedList<QueueEntry>();
+ single.add(new QueueEntryImpl(null,msg, Long.MIN_VALUE));
+
+ List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders);
+
+ //Reformat data
+ if (title != null)
+ {
+ column1.add(title);
+ column2.add("");
+ column1.add(Console.ROW_DIVIDER);
+ column2.add(Console.ROW_DIVIDER);
+ }
+
+ // look at all columns in the routing Data
+ for (List item : routingData)
+ {
+ // the item should be:
+ // Title
+ // *divider
+ // value
+ // otherwise we can't reason about the correct value
+ if (item.size() == 3)
+ {
+ //Filter out the columns we are not interested in.
+
+ String columnName = item.get(0).toString();
+
+ if (!(columnName.equals(Show.Columns.ID.name())
+ || columnName.equals(Show.Columns.Size.name())))
+ {
+ column1.add(columnName);
+ column2.add(item.get(2).toString());
+ }
+ }
+ }
+ column1.add(Console.ROW_DIVIDER);
+ column2.add(Console.ROW_DIVIDER);
+ }
+
+ private boolean isPrintable(byte c)
+ {
+ return c > 31 && c < 127;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java
new file mode 100644
index 0000000000..0f9546541b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+import java.util.LinkedList;
+import java.util.Map;
+
+public class Help extends AbstractCommand
+{
+ public Help(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Provides detailed help on commands.";
+ }
+
+ public String getCommand()
+ {
+ return "help";
+ }
+
+ public String usage()
+ {
+ return "help [<command>]";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length > 1)
+ {
+ Command command = _tool.getCommands().get(args[1]);
+ if (command != null)
+ {
+ _console.println(command.help());
+ _console.println("Usage:" + command.usage());
+ }
+ else
+ {
+ commandError("Command not found: ", args);
+ }
+ }
+ else
+ {
+ java.util.List<java.util.List> data = new LinkedList<java.util.List>();
+
+ java.util.List<String> commandName = new LinkedList<String>();
+ java.util.List<String> commandDescription = new LinkedList<String>();
+
+ data.add(commandName);
+ data.add(commandDescription);
+
+ //Set up Headers
+ commandName.add("Command");
+ commandDescription.add("Description");
+
+ commandName.add(Console.ROW_DIVIDER);
+ commandDescription.add(Console.ROW_DIVIDER);
+
+ //Add current Commands with descriptions
+ Map<String, Command> commands = _tool.getCommands();
+
+ for (Command command : commands.values())
+ {
+ commandName.add(command.getCommand());
+ commandDescription.add(command.help());
+ }
+
+ _console.printMap("Available Commands", data);
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
new file mode 100644
index 0000000000..3c4a0c8fac
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+import java.util.Collection;
+import java.util.LinkedList;
+
+public class List extends AbstractCommand
+{
+
+ public List(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public void setOutput(Console out)
+ {
+ _console = out;
+ }
+
+ public String help()
+ {
+ return "list available items.";
+ }
+
+ public String usage()
+ {
+ return "list queues [<exchange>] | exchanges | bindings [<exchange>] | all";
+ }
+
+ public String getCommand()
+ {
+ return "list";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length > 1)
+ {
+ if ((args[1].equals("exchanges"))
+ || (args[1].equals("queues"))
+ || (args[1].equals("bindings"))
+ || (args[1].equals("all")))
+ {
+ if (args.length == 2)
+ {
+ doList(args[1]);
+ }
+ else if (args.length == 3)
+ {
+ doList(args[1], args[2]);
+ }
+ }
+ else
+ {
+ commandError("Unknown options. ", args);
+ }
+ }
+ else if (args.length < 2)
+ {
+ doList("all");
+ }
+ else
+ {
+ doList(args[1]);
+ }
+ }
+
+ private void doList(String... listItem)
+ {
+ if (_tool.getState().getVhost() == null)
+ {
+ _console.println("No Virtualhost open. Open a Virtualhost first.");
+ listVirtualHosts();
+ return;
+ }
+
+ VirtualHost vhost = _tool.getState().getVhost();
+
+ java.util.List<String> data = null;
+
+ if (listItem[0].equals("queues"))
+ {
+ if (listItem.length > 1)
+ {
+ data = listQueues(vhost, new AMQShortString(listItem[1]));
+ }
+ else
+ {
+ Exchange exchange = _tool.getState().getExchange();
+ data = listQueues(vhost, exchange);
+ }
+ }
+
+ if (listItem[0].equals("exchanges"))
+ {
+ data = listExchanges(vhost);
+ }
+
+ if (listItem[0].equals("bindings"))
+ {
+
+ if (listItem.length > 1)
+ {
+ data = listBindings(vhost, new AMQShortString(listItem[1]));
+ }
+ else
+ {
+ Exchange exchange = _tool.getState().getExchange();
+
+ data = listBindings(vhost, exchange);
+ }
+ }
+
+ if (data != null)
+ {
+ if (data.size() == 1)
+ {
+ _console.println("No '" + listItem[0] + "' to display,");
+ }
+ else
+ {
+ _console.displayList(true, data.toArray(new String[0]));
+ }
+ }
+
+
+ if (listItem[0].equals("all"))
+ {
+
+ boolean displayed = false;
+ Exchange exchange = _tool.getState().getExchange();
+
+ //Do the display here for each one so that they are pretty printed
+ data = listQueues(vhost, exchange);
+ if (data != null)
+ {
+ displayed = true;
+ _console.displayList(true, data.toArray(new String[0]));
+ }
+
+ if (exchange == null)
+ {
+ data = listExchanges(vhost);
+ if (data != null)
+ {
+ displayed = true;
+ _console.displayList(true, data.toArray(new String[0]));
+ }
+ }
+
+ data = listBindings(vhost, exchange);
+ if (data != null)
+ {
+ displayed = true;
+ _console.displayList(true, data.toArray(new String[0]));
+ }
+
+ if (!displayed)
+ {
+ _console.println("Nothing to list");
+ }
+ }
+ }
+
+ private void listVirtualHosts()
+ {
+ Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance()
+ .getVirtualHostRegistry().getVirtualHosts();
+
+ String[] data = new String[vhosts.size() + 1];
+
+ data[0] = "Available VirtualHosts";
+
+ int index = 1;
+ for (VirtualHost vhost : vhosts)
+ {
+ data[index] = vhost.getName();
+ index++;
+ }
+
+ _console.displayList(true, data);
+ }
+
+ private java.util.List<String> listBindings(VirtualHost vhost, AMQShortString exchangeName)
+ {
+ return listBindings(vhost, vhost.getExchangeRegistry().getExchange(exchangeName));
+ }
+
+ private java.util.List<String> listBindings(VirtualHost vhost, Exchange exchange)
+ {
+ Collection<AMQShortString> queues = vhost.getQueueRegistry().getQueueNames();
+
+ if (queues == null || queues.size() == 0)
+ {
+ return null;
+ }
+
+ java.util.List<String> data = new LinkedList<String>();
+
+ data.add("Current Bindings");
+
+ for (AMQShortString queue : queues)
+ {
+ if (exchange != null)
+ {
+ if (exchange.isBound(queue))
+ {
+ data.add(queue.toString());
+ }
+ }
+ else
+ {
+ data.add(queue.toString());
+ }
+ }
+
+ return data;
+ }
+
+ private java.util.List<String> listExchanges(VirtualHost vhost)
+ {
+ Collection<AMQShortString> queues = vhost.getExchangeRegistry().getExchangeNames();
+
+ if (queues == null || queues.size() == 0)
+ {
+ return null;
+ }
+
+ java.util.List<String> data = new LinkedList<String>();
+
+ data.add("Available Exchanges");
+
+ for (AMQShortString queue : queues)
+ {
+ data.add(queue.toString());
+ }
+
+ return data;
+ }
+
+ private java.util.List<String> listQueues(VirtualHost vhost, AMQShortString exchangeName)
+ {
+ return listQueues(vhost, vhost.getExchangeRegistry().getExchange(exchangeName));
+ }
+
+ private java.util.List<String> listQueues(VirtualHost vhost, Exchange exchange)
+ {
+ Collection<AMQQueue> queues = vhost.getQueueRegistry().getQueues();
+
+ if (queues == null || queues.size() == 0)
+ {
+ return null;
+ }
+
+ java.util.List<String> data = new LinkedList<String>();
+
+ data.add("Available Queues");
+
+ for (AMQQueue queue : queues)
+ {
+ if (exchange != null)
+ {
+ if (exchange.isBound(queue))
+ {
+ data.add(queue.getNameShortString().toString());
+ }
+ }
+ else
+ {
+ data.add(queue.getNameShortString().toString());
+ }
+ }
+
+ if (exchange != null)
+ {
+ if (queues.size() == 1)
+ {
+ return null;
+ }
+ }
+
+ return data;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java
new file mode 100644
index 0000000000..244a311c30
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.configuration.Configuration;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+public class Load extends AbstractCommand
+{
+ public Load(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Loads specified broker configuration file.";
+ }
+
+ public String usage()
+ {
+ return "load <configuration file>";
+ }
+
+ public String getCommand()
+ {
+ return "load";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length > 2)
+ {
+ _console.print("load " + args[1] + ": additional options not understood:");
+ for (int i = 2; i < args.length; i++)
+ {
+ _console.print(args[i] + " ");
+ }
+ _console.println("");
+ }
+ else if (args.length < 2)
+ {
+ _console.println("Enter Configuration file.");
+ String input = _console.readln();
+ if (input != null)
+ {
+ doLoad(input);
+ }
+ else
+ {
+ _console.println("Did not recognise config file.");
+ }
+ }
+ else
+ {
+ doLoad(args[1]);
+ }
+ }
+
+ private void doLoad(String configfile)
+ {
+ _console.println("Loading Configuration:" + configfile);
+
+ try
+ {
+ _tool.setConfigurationFile(configfile);
+ }
+ catch (Configuration.InitException e)
+ {
+ _console.println("Unable to open config file due to: '" + e.getMessage() + "'");
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
new file mode 100644
index 0000000000..615f6ec1c2
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class Move extends AbstractCommand
+{
+
+ public Move(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Move messages between queues.";/*\n" +
+ "The currently selected message set will be moved to the specifed queue.\n" +
+ "Alternatively the values can be provided on the command line.";*/
+ }
+
+ public String usage()
+ {
+ return "move to=<queue> [from=<queue>] [msgids=<msgids eg, 1,2,4-10>]";
+ }
+
+ public String getCommand()
+ {
+ return "move";
+ }
+
+ public void execute(String... args)
+ {
+ AMQQueue toQueue = null;
+ AMQQueue fromQueue = _tool.getState().getQueue();
+ java.util.List<Long> msgids = _tool.getState().getMessages();
+
+ if (args.length >= 2)
+ {
+ for (String arg : args)
+ {
+ if (arg.startsWith("to="))
+ {
+ String queueName = arg.substring(arg.indexOf("=") + 1);
+ toQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ }
+
+ if (arg.startsWith("from="))
+ {
+ String queueName = arg.substring(arg.indexOf("=") + 1);
+ fromQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ }
+
+ if (arg.startsWith("msgids="))
+ {
+ String msgidStr = arg.substring(arg.indexOf("=") + 1);
+
+ // Record the current message selection
+ java.util.List<Long> currentIDs = _tool.getState().getMessages();
+
+ // Use the ToolState class to perform the messasge parsing
+ _tool.getState().setMessages(msgidStr);
+ msgids = _tool.getState().getMessages();
+
+ // Reset the original selection of messages
+ _tool.getState().setMessages(currentIDs);
+ }
+ }
+ }
+
+ if (!checkRequirements(fromQueue, toQueue, msgids))
+ {
+ return;
+ }
+
+ processIDs(fromQueue, toQueue, msgids);
+ }
+
+ private void processIDs(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids)
+ {
+ Long previous = null;
+ Long start = null;
+
+ if (msgids == null)
+ {
+ msgids = allMessageIDs(fromQueue);
+ }
+
+ if (msgids == null || msgids.size() == 0)
+ {
+ _console.println("No Messages to move.");
+ return;
+ }
+
+ for (long id : msgids)
+ {
+ if (previous != null)
+ {
+ if (id == previous + 1)
+ {
+ if (start == null)
+ {
+ start = previous;
+ }
+ }
+ else
+ {
+ if (start != null)
+ {
+ //move a range of ids
+ doCommand(fromQueue, start, id, toQueue);
+ start = null;
+ }
+ else
+ {
+ //move a single id
+ doCommand(fromQueue, id, id, toQueue);
+ }
+ }
+ }
+
+ previous = id;
+ }
+
+ if (start != null)
+ {
+ //move a range of ids
+ doCommand(fromQueue, start, previous, toQueue);
+ }
+ }
+
+ private List<Long> allMessageIDs(AMQQueue fromQueue)
+ {
+ List<Long> ids = new LinkedList<Long>();
+
+ if (fromQueue != null)
+ {
+ List<QueueEntry> messages = fromQueue.getMessagesOnTheQueue();
+ if (messages != null)
+ {
+ for (QueueEntry msg : messages)
+ {
+ ids.add(msg.getMessage().getMessageNumber());
+ }
+ }
+ }
+
+ return ids;
+ }
+
+ protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, List<Long> msgids)
+ {
+ if (toQueue == null)
+ {
+ _console.println("Destination queue not specifed.");
+ _console.println(usage());
+ return false;
+ }
+
+ if (fromQueue == null)
+ {
+ _console.println("Source queue not specifed.");
+ _console.println(usage());
+ return false;
+ }
+
+ return true;
+ }
+
+ protected void doCommand(AMQQueue fromQueue, long start, long id, AMQQueue toQueue)
+ {
+ ServerTransaction txn = new LocalTransaction(fromQueue.getVirtualHost().getTransactionLog());
+ fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getNameShortString().toString(), txn);
+ txn.commit();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
new file mode 100644
index 0000000000..8df4afa2db
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class Purge extends Move
+{
+ public Purge(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Purge messages from a queue.\n" +
+ "The currently selected message set will be purged from the specifed queue.\n" +
+ "Alternatively the values can be provided on the command line.";
+ }
+
+ public String usage()
+ {
+ return "purge from=<queue> [msgids=<msgids eg, 1,2,4-10>]";
+ }
+
+ public String getCommand()
+ {
+ return "purge";
+ }
+
+
+ protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids)
+ {
+ if (fromQueue == null)
+ {
+ _console.println("Source queue not specifed.");
+ _console.println(usage());
+ return false;
+ }
+
+ return true;
+ }
+
+ protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue)
+ {
+ fromQueue.removeMessagesFromQueue(start, end);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java
new file mode 100644
index 0000000000..a81bc07c38
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+public class Quit extends AbstractCommand
+{
+ public Quit(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Quit the tool.";
+ }
+
+ public String usage()
+ {
+ return "quit";
+ }
+
+ public String getCommand()
+ {
+ return "quit";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals("quit");
+
+ _tool.quit();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java
new file mode 100644
index 0000000000..ff59568374
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+import java.util.LinkedList;
+import java.util.StringTokenizer;
+
+public class Select extends AbstractCommand
+{
+
+ public Select(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Perform a selection.";
+ }
+
+ public String usage()
+ {
+ return "select virtualhost <name> |exchange <name> |queue <name> | msg id=<msgids eg. 1,2,4-10>";
+ }
+
+ public String getCommand()
+ {
+ return "select";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 2;
+ assert args[0].equals("select");
+
+ if (args.length < 3)
+ {
+ if (args[1].equals("show"))
+ {
+ doSelect(args[1], null);
+ }
+ else
+ {
+ _console.print("select : unknown command:");
+ _console.println(help());
+ }
+ }
+ else
+ {
+ if (args[1].equals("virtualhost")
+ || args[1].equals("vhost")
+ || args[1].equals("exchange")
+ || args[1].equals("queue")
+ || args[1].equals("msg")
+ )
+ {
+ doSelect(args[1], args[2]);
+ }
+ else
+ {
+ _console.println(help());
+ }
+ }
+ }
+
+ private void doSelect(String type, String item)
+ {
+ if (type.equals("virtualhost"))
+ {
+
+ VirtualHost vhost = ApplicationRegistry.getInstance()
+ .getVirtualHostRegistry().getVirtualHost(item);
+
+ if (vhost == null)
+ {
+ _console.println("Virtualhost '" + item + "' not found.");
+ }
+ else
+ {
+ _tool.getState().setVhost(vhost);
+ }
+ }
+
+ if (type.equals("exchange"))
+ {
+
+ VirtualHost vhost = _tool.getState().getVhost();
+
+ if (vhost == null)
+ {
+ _console.println("No Virtualhost open. Open a Virtualhost first.");
+ return;
+ }
+
+
+ Exchange exchange = vhost.getExchangeRegistry().getExchange(new AMQShortString(item));
+
+ if (exchange == null)
+ {
+ _console.println("Exchange '" + item + "' not found.");
+ }
+ else
+ {
+ _tool.getState().setExchange(exchange);
+ }
+
+ if (_tool.getState().getQueue() != null)
+ {
+ if (!exchange.isBound(_tool.getState().getQueue()))
+ {
+ _tool.getState().setQueue(null);
+ }
+ }
+ }
+
+ if (type.equals("queue"))
+ {
+ VirtualHost vhost = _tool.getState().getVhost();
+
+ if (vhost == null)
+ {
+ _console.println("No Virtualhost open. Open a Virtualhost first.");
+ return;
+ }
+
+ AMQQueue queue = vhost.getQueueRegistry().getQueue(new AMQShortString(item));
+
+ if (queue == null)
+ {
+ _console.println("Queue '" + item + "' not found.");
+ }
+ else
+ {
+ _tool.getState().setQueue(queue);
+
+ if (_tool.getState().getExchange() == null)
+ {
+ for (AMQShortString exchangeName : vhost.getExchangeRegistry().getExchangeNames())
+ {
+ Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName);
+ if (exchange.isBound(queue))
+ {
+ _tool.getState().setExchange(exchange);
+ break;
+ }
+ }
+ }
+
+ //remove the message selection
+ _tool.getState().setMessages((java.util.List<Long>) null);
+ }
+ }
+
+ if (type.equals("msg"))
+ {
+ if (item.startsWith("id="))
+ {
+ StringTokenizer tok = new StringTokenizer(item.substring(item.indexOf("=") + 1), ",");
+
+ java.util.List<Long> msgids = null;
+
+ if (tok.hasMoreTokens())
+ {
+ msgids = new LinkedList<Long>();
+ }
+
+ while (tok.hasMoreTokens())
+ {
+ String next = tok.nextToken();
+ if (next.contains("-"))
+ {
+ Long start = Long.parseLong(next.substring(0, next.indexOf("-")));
+ Long end = Long.parseLong(next.substring(next.indexOf("-") + 1));
+
+ if (end >= start)
+ {
+ for (long l = start; l <= end; l++)
+ {
+ msgids.add(l);
+ }
+ }
+ }
+ else
+ {
+ msgids.add(Long.parseLong(next));
+ }
+ }
+
+ _tool.getState().setMessages(msgids);
+ }
+
+ }
+
+ if (type.equals("show"))
+ {
+ _console.println(_tool.getState().toString());
+ if (_tool.getState().getMessages() != null)
+ {
+ _console.print("Msgs:");
+ for (Long l : _tool.getState().getMessages())
+ {
+ _console.print(" " + l);
+ }
+ _console.println("");
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
new file mode 100644
index 0000000000..806e161bbc
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class Show extends AbstractCommand
+{
+ protected boolean _amqHeaders = false;
+ protected boolean _routing = false;
+ protected boolean _msgHeaders = false;
+
+ public Show(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Shows the messages headers.";
+ }
+
+ public String usage()
+ {
+ return getCommand() + " [show=[all],[msgheaders],[amqheaders],[routing]] [id=<msgid e.g. 1,2,4-10>]";
+ }
+
+ public String getCommand()
+ {
+ return "show";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length < 2)
+ {
+ parseArgs("all");
+ }
+ else
+ {
+ parseArgs(args);
+ }
+
+ performShow();
+ }
+
+ protected void parseArgs(String... args)
+ {
+ List<Long> msgids = null;
+
+ if (args.length >= 2)
+ {
+ for (String arg : args)
+ {
+ if (arg.startsWith("show="))
+ {
+ _msgHeaders = arg.contains("msgheaders") || arg.contains("all");
+ _amqHeaders = arg.contains("amqheaders") || arg.contains("all");
+ _routing = arg.contains("routing") || arg.contains("all");
+ }
+
+ if (arg.startsWith("id="))
+ {
+ _tool.getState().setMessages(msgids);
+ }
+ }//for args
+ }// if args > 2
+ }
+
+ protected void performShow()
+ {
+ if (_tool.getState().getVhost() == null)
+ {
+ _console.println("No Virtualhost selected. 'DuSelect' a Virtualhost first.");
+ return;
+ }
+
+ AMQQueue _queue = _tool.getState().getQueue();
+
+ List<Long> msgids = _tool.getState().getMessages();
+
+ if (_queue != null)
+ {
+ List<QueueEntry> messages = _queue.getMessagesOnTheQueue();
+ if (messages == null || messages.size() == 0)
+ {
+ _console.println("No messages on queue");
+ return;
+ }
+
+ List<List> data = createMessageData(msgids, messages, _amqHeaders, _routing, _msgHeaders);
+ if (data != null)
+ {
+ _console.printMap(null, data);
+ }
+ else
+ {
+ String message = "No data to display.";
+ if (msgids != null)
+ {
+ message += " Is message selection correct? " + _tool.getState().printMessages();
+ }
+ _console.println(message);
+ }
+
+ }
+ else
+ {
+ _console.println("No Queue specified to show.");
+ }
+ }
+
+ /**
+ * Create the list data for display from the messages.
+ *
+ * @param msgids The list of message ids to display
+ * @param messages A list of messages to format and display.
+ * @param showHeaders should the header info be shown
+ * @param showRouting show the routing info be shown
+ * @param showMessageHeaders show the msg headers be shown
+ * @return the formated data lists for printing
+ */
+ protected List<List> createMessageData(List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting,
+ boolean showMessageHeaders)
+ {
+
+ // Currenly exposed message properties
+// //Printing the content Body
+// msg.getContentBodyIterator();
+// //Print the Headers
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getAppId();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getAppIdAsString();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getClusterId();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getContentType();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getCorrelationId();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getDeliveryMode();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getEncoding();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getExpiration();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getMessageNumber();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPriority();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPropertyFlags();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getReplyTo();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getTimestamp();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getType();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getUserId();
+//
+// //Print out all the property names
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders().getPropertyNames();
+//
+// msg.getMessageNumber();
+// msg.getSize();
+// msg.getArrivalTime();
+
+// msg.getDeliveredSubscription();
+// msg.getDeliveredToConsumer();
+// msg.getMessageHandle();
+// msg.getMessageNumber();
+// msg.getMessagePublishInfo();
+// msg.getPublisher();
+
+// msg.getStoreContext();
+// msg.isAllContentReceived();
+// msg.isPersistent();
+// msg.isRedelivered();
+// msg.isRejectedBy();
+// msg.isTaken();
+
+ //Header setup
+
+ List<List> data = new LinkedList<List>();
+
+ List<String> id = new LinkedList<String>();
+ data.add(id);
+ id.add(Columns.ID.name());
+ id.add(Console.ROW_DIVIDER);
+
+ List<String> exchange = new LinkedList<String>();
+ List<String> routingkey = new LinkedList<String>();
+ List<String> immediate = new LinkedList<String>();
+ List<String> mandatory = new LinkedList<String>();
+ if (showRouting)
+ {
+ data.add(exchange);
+ exchange.add(Columns.Exchange.name());
+ exchange.add(Console.ROW_DIVIDER);
+
+ data.add(routingkey);
+ routingkey.add(Columns.RoutingKey.name());
+ routingkey.add(Console.ROW_DIVIDER);
+
+ data.add(immediate);
+ immediate.add(Columns.isImmediate.name());
+ immediate.add(Console.ROW_DIVIDER);
+
+ data.add(mandatory);
+ mandatory.add(Columns.isMandatory.name());
+ mandatory.add(Console.ROW_DIVIDER);
+ }
+
+ List<String> size = new LinkedList<String>();
+ List<String> appid = new LinkedList<String>();
+ List<String> clusterid = new LinkedList<String>();
+ List<String> contenttype = new LinkedList<String>();
+ List<String> correlationid = new LinkedList<String>();
+ List<String> deliverymode = new LinkedList<String>();
+ List<String> encoding = new LinkedList<String>();
+ List<String> arrival = new LinkedList<String>();
+ List<String> expiration = new LinkedList<String>();
+ List<String> priority = new LinkedList<String>();
+ List<String> propertyflag = new LinkedList<String>();
+ List<String> replyto = new LinkedList<String>();
+ List<String> timestamp = new LinkedList<String>();
+ List<String> type = new LinkedList<String>();
+ List<String> userid = new LinkedList<String>();
+ List<String> ispersitent = new LinkedList<String>();
+ List<String> isredelivered = new LinkedList<String>();
+ List<String> isdelivered = new LinkedList<String>();
+
+ data.add(size);
+ size.add(Columns.Size.name());
+ size.add(Console.ROW_DIVIDER);
+
+ if (showHeaders)
+ {
+ data.add(ispersitent);
+ ispersitent.add(Columns.isPersistent.name());
+ ispersitent.add(Console.ROW_DIVIDER);
+
+ data.add(isredelivered);
+ isredelivered.add(Columns.isRedelivered.name());
+ isredelivered.add(Console.ROW_DIVIDER);
+
+ data.add(isdelivered);
+ isdelivered.add(Columns.isDelivered.name());
+ isdelivered.add(Console.ROW_DIVIDER);
+
+ data.add(appid);
+ appid.add(Columns.App_ID.name());
+ appid.add(Console.ROW_DIVIDER);
+
+ data.add(clusterid);
+ clusterid.add(Columns.Cluster_ID.name());
+ clusterid.add(Console.ROW_DIVIDER);
+
+ data.add(contenttype);
+ contenttype.add(Columns.Content_Type.name());
+ contenttype.add(Console.ROW_DIVIDER);
+
+ data.add(correlationid);
+ correlationid.add(Columns.Correlation_ID.name());
+ correlationid.add(Console.ROW_DIVIDER);
+
+ data.add(deliverymode);
+ deliverymode.add(Columns.Delivery_Mode.name());
+ deliverymode.add(Console.ROW_DIVIDER);
+
+ data.add(encoding);
+ encoding.add(Columns.Encoding.name());
+ encoding.add(Console.ROW_DIVIDER);
+
+ data.add(arrival);
+ expiration.add(Columns.Arrival.name());
+ expiration.add(Console.ROW_DIVIDER);
+
+ data.add(expiration);
+ expiration.add(Columns.Expiration.name());
+ expiration.add(Console.ROW_DIVIDER);
+
+ data.add(priority);
+ priority.add(Columns.Priority.name());
+ priority.add(Console.ROW_DIVIDER);
+
+ data.add(propertyflag);
+ propertyflag.add(Columns.Property_Flag.name());
+ propertyflag.add(Console.ROW_DIVIDER);
+
+ data.add(replyto);
+ replyto.add(Columns.ReplyTo.name());
+ replyto.add(Console.ROW_DIVIDER);
+
+ data.add(timestamp);
+ timestamp.add(Columns.Timestamp.name());
+ timestamp.add(Console.ROW_DIVIDER);
+
+ data.add(type);
+ type.add(Columns.Type.name());
+ type.add(Console.ROW_DIVIDER);
+
+ data.add(userid);
+ userid.add(Columns.UserID.name());
+ userid.add(Console.ROW_DIVIDER);
+ }
+
+ List<String> msgHeaders = new LinkedList<String>();
+ if (showMessageHeaders)
+ {
+ data.add(msgHeaders);
+ msgHeaders.add(Columns.MsgHeaders.name());
+ msgHeaders.add(Console.ROW_DIVIDER);
+ }
+
+ //Add create the table of data
+ for (QueueEntry entry : messages)
+ {
+ ServerMessage msg = entry.getMessage();
+ if (!includeMsg(msg, msgids))
+ {
+ continue;
+ }
+
+ id.add(msg.getMessageNumber().toString());
+
+ size.add("" + msg.getSize());
+
+ arrival.add("" + msg.getArrivalTime());
+
+ ispersitent.add(msg.isPersistent() ? "true" : "false");
+
+
+ isredelivered.add(entry.isRedelivered() ? "true" : "false");
+
+ isdelivered.add(entry.getDeliveredToConsumer() ? "true" : "false");
+
+// msg.getMessageHandle();
+
+ BasicContentHeaderProperties headers = null;
+
+ try
+ {
+ if(msg instanceof AMQMessage)
+ {
+ headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().getProperties());
+ }
+ }
+ catch (AMQException e)
+ {
+ //ignore
+// commandError("Unable to read properties for message: " + e.getMessage(), null);
+ }
+
+ if (headers != null)
+ {
+ String appidS = headers.getAppIdAsString();
+ appid.add(appidS == null ? "null" : appidS);
+
+ String clusterS = headers.getClusterIdAsString();
+ clusterid.add(clusterS == null ? "null" : clusterS);
+
+ String contentS = headers.getContentTypeAsString();
+ contenttype.add(contentS == null ? "null" : contentS);
+
+ String correlationS = headers.getCorrelationIdAsString();
+ correlationid.add(correlationS == null ? "null" : correlationS);
+
+ deliverymode.add("" + headers.getDeliveryMode());
+
+ AMQShortString encodeSS = headers.getEncoding();
+ encoding.add(encodeSS == null ? "null" : encodeSS.toString());
+
+ expiration.add("" + headers.getExpiration());
+
+ FieldTable headerFT = headers.getHeaders();
+ msgHeaders.add(headerFT == null ? "none" : "" + headerFT.toString());
+
+ priority.add("" + headers.getPriority());
+ propertyflag.add("" + headers.getPropertyFlags());
+
+ AMQShortString replytoSS = headers.getReplyTo();
+ replyto.add(replytoSS == null ? "null" : replytoSS.toString());
+
+ timestamp.add("" + headers.getTimestamp());
+
+ AMQShortString typeSS = headers.getType();
+ type.add(typeSS == null ? "null" : typeSS.toString());
+
+ AMQShortString useridSS = headers.getUserId();
+ userid.add(useridSS == null ? "null" : useridSS.toString());
+
+ MessagePublishInfo info = null;
+ try
+ {
+ if(msg instanceof AMQMessage)
+ {
+ info = ((AMQMessage)msg).getMessagePublishInfo();
+ }
+
+ }
+ catch (AMQException e)
+ {
+ //ignore
+ }
+
+ if (info != null)
+ {
+ AMQShortString exchangeSS = info.getExchange();
+ exchange.add(exchangeSS == null ? "null" : exchangeSS.toString());
+
+ AMQShortString routingkeySS = info.getRoutingKey();
+ routingkey.add(routingkeySS == null ? "null" : routingkeySS.toString());
+
+ immediate.add(info.isImmediate() ? "true" : "false");
+ mandatory.add(info.isMandatory() ? "true" : "false");
+ }
+
+// msg.getPublisher(); -- only used in clustering
+// msg.getStoreContext();
+// msg.isAllContentReceived();
+
+ }// if headers!=null
+
+// need to access internal map and do lookups.
+// msg.isTaken();
+// msg.getDeliveredSubscription();
+// msg.isRejectedBy();
+
+ }
+
+ // if id only had the header and the divider in it then we have no data to display
+ if (id.size() == 2)
+ {
+ return null;
+ }
+ return data;
+ }
+
+ protected boolean includeMsg(ServerMessage msg, List<Long> msgids)
+ {
+ if (msgids == null)
+ {
+ return true;
+ }
+
+ Long msgid = msg.getMessageNumber();
+
+ boolean found = false;
+
+ if (msgids != null)
+ {
+ //check msgid is in msgids
+ for (Long l : msgids)
+ {
+ if (l.equals(msgid))
+ {
+ found = true;
+ break;
+ }
+ }
+ }
+ return found;
+ }
+
+ public enum Columns
+ {
+ ID,
+ Size,
+ Exchange,
+ RoutingKey,
+ isImmediate,
+ isMandatory,
+ isPersistent,
+ isRedelivered,
+ isDelivered,
+ App_ID,
+ Cluster_ID,
+ Content_Type,
+ Correlation_ID,
+ Delivery_Mode,
+ Encoding,
+ Arrival,
+ Expiration,
+ Priority,
+ Property_Flag,
+ ReplyTo,
+ Timestamp,
+ Type,
+ UserID,
+ MsgHeaders
+ }
+}
+
+