diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands')
13 files changed, 2129 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java new file mode 100644 index 0000000000..5444197cb4 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.tools.messagestore.MessageStoreTool; +import org.apache.qpid.tools.utils.Console; + +public abstract class AbstractCommand implements Command +{ + protected Console _console; + protected MessageStoreTool _tool; + + public AbstractCommand(MessageStoreTool tool) + { + _console = tool.getConsole(); + _tool = tool; + } + + public void setOutput(Console out) + { + _console = out; + } + + protected void commandError(String message, String[] args) + { + _console.print(getCommand() + " : " + message); + + if (args != null) + { + for (int i = 1; i < args.length; i++) + { + _console.print(args[i]); + } + } + _console.println(""); + _console.println(help()); + } + + + public abstract String help(); + + public abstract String usage(); + + public abstract String getCommand(); + + + public abstract void execute(String... args); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java new file mode 100644 index 0000000000..b0006b3fe6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.tools.messagestore.MessageStoreTool; + +public class Clear extends AbstractCommand +{ + public Clear(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Clears any selection."; + } + + public String usage() + { + return "clear [ all | virtualhost | exchange | queue | msgs ]"; + } + + public String getCommand() + { + return "clear"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals(getCommand()); + + if (args.length < 1) + { + doClose("all"); + } + else + { + doClose(args[1]); + } + } + + private void doClose(String type) + { + if (type.equals("virtualhost") + || type.equals("all")) + { + _tool.getState().clearAll(); + } + + if (type.equals("exchange")) + { + _tool.getState().clearExchange(); + } + + if (type.equals("queue")) + { + _tool.getState().clearQueue(); + } + + if (type.equals("msgs")) + { + _tool.getState().clearMessages(); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java new file mode 100644 index 0000000000..bfa775a34a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.tools.utils.Console; + +public interface Command +{ + public void setOutput(Console out); + + public String help(); + + public abstract String usage(); + + String getCommand(); + + public void execute(String... args); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java new file mode 100644 index 0000000000..348c95572d --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.tools.messagestore.MessageStoreTool; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.txn.LocalTransaction; + +public class Copy extends Move +{ + public Copy(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Copy messages between queues.";/*\n" + + "The currently selected message set will be copied to the specifed queue.\n" + + "Alternatively the values can be provided on the command line."; */ + } + + public String usage() + { + return "copy to=<queue> [from=<queue>] [msgids=<msgids eg, 1,2,4-10>]"; + } + + public String getCommand() + { + return "copy"; + } + + protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue) + { + ServerTransaction txn = new LocalTransaction(fromQueue.getVirtualHost().getTransactionLog()); + fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getNameShortString().toString(), txn); + txn.commit(); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java new file mode 100644 index 0000000000..8bb5d02b01 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.commons.codec.binary.Hex; +import org.apache.qpid.server.queue.QueueEntryImpl; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.tools.messagestore.MessageStoreTool; +import org.apache.qpid.tools.utils.Console; + +import java.io.UnsupportedEncodingException; +import java.util.LinkedList; +import java.util.List; + +public class Dump extends Show +{ + private static final int LINE_SIZE = 8; + private static final String DEFAULT_ENCODING = "utf-8"; + private static final boolean SPACE_BYTES = true; + private static final String BYTE_SPACER = " "; + private static final String NON_PRINTING_ASCII_CHAR = "?"; + + protected boolean _content = true; + + public Dump(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Dump selected message content. Default: show=content"; + } + + public String usage() + { + return getCommand() + " [show=[all],[msgheaders],[_amqHeaders],[routing],[content]] [id=<msgid e.g. 1,2,4-10>]"; + } + + public String getCommand() + { + return "dump"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals(getCommand()); + + + if (args.length >= 2) + { + for (String arg : args) + { + if (arg.startsWith("show=")) + { + _content = arg.contains("content") || arg.contains("all"); + } + } + + parseArgs(args); + } + + performShow(); + } + + + protected List<List> createMessageData(java.util.List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting, + boolean showMessageHeaders) + { + + List<List> display = new LinkedList<List>(); + + List<String> hex = new LinkedList<String>(); + List<String> ascii = new LinkedList<String>(); + display.add(hex); + display.add(ascii); + + for (QueueEntry entry : messages) + { + ServerMessage msg = entry.getMessage(); + if (!includeMsg(msg, msgids)) + { + continue; + } + + //Add divider between messages + hex.add(Console.ROW_DIVIDER); + ascii.add(Console.ROW_DIVIDER); + + // Show general message information + hex.add(Show.Columns.ID.name()); + ascii.add(msg.getMessageNumber().toString()); + + hex.add(Console.ROW_DIVIDER); + ascii.add(Console.ROW_DIVIDER); + + if (showRouting) + { + addShowInformation(hex, ascii, msg, "Routing Details", true, false, false); + } + if (showHeaders) + { + addShowInformation(hex, ascii, msg, "Headers", false, true, false); + } + if (showMessageHeaders) + { + addShowInformation(hex, ascii, msg, null, false, false, true); + } + + // Add Content Body section + hex.add("Content Body"); + ascii.add(""); + hex.add(Console.ROW_DIVIDER); + ascii.add(Console.ROW_DIVIDER); + + + final int messageSize = (int) msg.getSize(); + if (messageSize != 0) + { + hex.add("Hex"); + hex.add(Console.ROW_DIVIDER); + + + ascii.add("ASCII"); + ascii.add(Console.ROW_DIVIDER); + + java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(64 * 1024); + + int position = 0; + + while(position < messageSize) + { + + position += msg.getContent(buf, position); + buf.flip(); + //Duplicate so we don't destroy original data :) + java.nio.ByteBuffer hexBuffer = buf; + + java.nio.ByteBuffer charBuffer = hexBuffer.duplicate(); + + Hex hexencoder = new Hex(); + + while (hexBuffer.hasRemaining()) + { + byte[] line = new byte[LINE_SIZE]; + + int bufsize = hexBuffer.remaining(); + if (bufsize < LINE_SIZE) + { + hexBuffer.get(line, 0, bufsize); + } + else + { + bufsize = line.length; + hexBuffer.get(line); + } + + byte[] encoded = hexencoder.encode(line); + + try + { + String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING); + String hexLine = ""; + + int strLength = encStr.length(); + for (int c = 0; c < strLength; c++) + { + hexLine += encStr.charAt(c); + + if ((c & 1) == 1 && SPACE_BYTES) + { + hexLine += BYTE_SPACER; + } + } + + hex.add(hexLine); + } + catch (UnsupportedEncodingException e) + { + _console.println(e.getMessage()); + return null; + } + } + + while (charBuffer.hasRemaining()) + { + String asciiLine = ""; + + for (int pos = 0; pos < LINE_SIZE; pos++) + { + if (charBuffer.hasRemaining()) + { + byte ch = charBuffer.get(); + + if (isPrintable(ch)) + { + asciiLine += (char) ch; + } + else + { + asciiLine += NON_PRINTING_ASCII_CHAR; + } + + if (SPACE_BYTES) + { + asciiLine += BYTE_SPACER; + } + } + else + { + break; + } + } + + ascii.add(asciiLine); + } + buf.clear(); + } + } + else + { + List<String> result = new LinkedList<String>(); + + display.add(result); + result.add("No ContentBodies"); + } + } + + // if hex is empty then we have no data to display + if (hex.size() == 0) + { + return null; + } + + return display; + } + + private void addShowInformation(List<String> column1, List<String> column2, ServerMessage msg, + String title, boolean routing, boolean headers, boolean messageHeaders) + { + List<QueueEntry> single = new LinkedList<QueueEntry>(); + single.add(new QueueEntryImpl(null,msg, Long.MIN_VALUE)); + + List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders); + + //Reformat data + if (title != null) + { + column1.add(title); + column2.add(""); + column1.add(Console.ROW_DIVIDER); + column2.add(Console.ROW_DIVIDER); + } + + // look at all columns in the routing Data + for (List item : routingData) + { + // the item should be: + // Title + // *divider + // value + // otherwise we can't reason about the correct value + if (item.size() == 3) + { + //Filter out the columns we are not interested in. + + String columnName = item.get(0).toString(); + + if (!(columnName.equals(Show.Columns.ID.name()) + || columnName.equals(Show.Columns.Size.name()))) + { + column1.add(columnName); + column2.add(item.get(2).toString()); + } + } + } + column1.add(Console.ROW_DIVIDER); + column2.add(Console.ROW_DIVIDER); + } + + private boolean isPrintable(byte c) + { + return c > 31 && c < 127; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java new file mode 100644 index 0000000000..0f9546541b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.tools.messagestore.MessageStoreTool; +import org.apache.qpid.tools.utils.Console; + +import java.util.LinkedList; +import java.util.Map; + +public class Help extends AbstractCommand +{ + public Help(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Provides detailed help on commands."; + } + + public String getCommand() + { + return "help"; + } + + public String usage() + { + return "help [<command>]"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals(getCommand()); + + if (args.length > 1) + { + Command command = _tool.getCommands().get(args[1]); + if (command != null) + { + _console.println(command.help()); + _console.println("Usage:" + command.usage()); + } + else + { + commandError("Command not found: ", args); + } + } + else + { + java.util.List<java.util.List> data = new LinkedList<java.util.List>(); + + java.util.List<String> commandName = new LinkedList<String>(); + java.util.List<String> commandDescription = new LinkedList<String>(); + + data.add(commandName); + data.add(commandDescription); + + //Set up Headers + commandName.add("Command"); + commandDescription.add("Description"); + + commandName.add(Console.ROW_DIVIDER); + commandDescription.add(Console.ROW_DIVIDER); + + //Add current Commands with descriptions + Map<String, Command> commands = _tool.getCommands(); + + for (Command command : commands.values()) + { + commandName.add(command.getCommand()); + commandDescription.add(command.help()); + } + + _console.printMap("Available Commands", data); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java new file mode 100644 index 0000000000..3c4a0c8fac --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.tools.messagestore.MessageStoreTool; +import org.apache.qpid.tools.utils.Console; + +import java.util.Collection; +import java.util.LinkedList; + +public class List extends AbstractCommand +{ + + public List(MessageStoreTool tool) + { + super(tool); + } + + public void setOutput(Console out) + { + _console = out; + } + + public String help() + { + return "list available items."; + } + + public String usage() + { + return "list queues [<exchange>] | exchanges | bindings [<exchange>] | all"; + } + + public String getCommand() + { + return "list"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals(getCommand()); + + if (args.length > 1) + { + if ((args[1].equals("exchanges")) + || (args[1].equals("queues")) + || (args[1].equals("bindings")) + || (args[1].equals("all"))) + { + if (args.length == 2) + { + doList(args[1]); + } + else if (args.length == 3) + { + doList(args[1], args[2]); + } + } + else + { + commandError("Unknown options. ", args); + } + } + else if (args.length < 2) + { + doList("all"); + } + else + { + doList(args[1]); + } + } + + private void doList(String... listItem) + { + if (_tool.getState().getVhost() == null) + { + _console.println("No Virtualhost open. Open a Virtualhost first."); + listVirtualHosts(); + return; + } + + VirtualHost vhost = _tool.getState().getVhost(); + + java.util.List<String> data = null; + + if (listItem[0].equals("queues")) + { + if (listItem.length > 1) + { + data = listQueues(vhost, new AMQShortString(listItem[1])); + } + else + { + Exchange exchange = _tool.getState().getExchange(); + data = listQueues(vhost, exchange); + } + } + + if (listItem[0].equals("exchanges")) + { + data = listExchanges(vhost); + } + + if (listItem[0].equals("bindings")) + { + + if (listItem.length > 1) + { + data = listBindings(vhost, new AMQShortString(listItem[1])); + } + else + { + Exchange exchange = _tool.getState().getExchange(); + + data = listBindings(vhost, exchange); + } + } + + if (data != null) + { + if (data.size() == 1) + { + _console.println("No '" + listItem[0] + "' to display,"); + } + else + { + _console.displayList(true, data.toArray(new String[0])); + } + } + + + if (listItem[0].equals("all")) + { + + boolean displayed = false; + Exchange exchange = _tool.getState().getExchange(); + + //Do the display here for each one so that they are pretty printed + data = listQueues(vhost, exchange); + if (data != null) + { + displayed = true; + _console.displayList(true, data.toArray(new String[0])); + } + + if (exchange == null) + { + data = listExchanges(vhost); + if (data != null) + { + displayed = true; + _console.displayList(true, data.toArray(new String[0])); + } + } + + data = listBindings(vhost, exchange); + if (data != null) + { + displayed = true; + _console.displayList(true, data.toArray(new String[0])); + } + + if (!displayed) + { + _console.println("Nothing to list"); + } + } + } + + private void listVirtualHosts() + { + Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance() + .getVirtualHostRegistry().getVirtualHosts(); + + String[] data = new String[vhosts.size() + 1]; + + data[0] = "Available VirtualHosts"; + + int index = 1; + for (VirtualHost vhost : vhosts) + { + data[index] = vhost.getName(); + index++; + } + + _console.displayList(true, data); + } + + private java.util.List<String> listBindings(VirtualHost vhost, AMQShortString exchangeName) + { + return listBindings(vhost, vhost.getExchangeRegistry().getExchange(exchangeName)); + } + + private java.util.List<String> listBindings(VirtualHost vhost, Exchange exchange) + { + Collection<AMQShortString> queues = vhost.getQueueRegistry().getQueueNames(); + + if (queues == null || queues.size() == 0) + { + return null; + } + + java.util.List<String> data = new LinkedList<String>(); + + data.add("Current Bindings"); + + for (AMQShortString queue : queues) + { + if (exchange != null) + { + if (exchange.isBound(queue)) + { + data.add(queue.toString()); + } + } + else + { + data.add(queue.toString()); + } + } + + return data; + } + + private java.util.List<String> listExchanges(VirtualHost vhost) + { + Collection<AMQShortString> queues = vhost.getExchangeRegistry().getExchangeNames(); + + if (queues == null || queues.size() == 0) + { + return null; + } + + java.util.List<String> data = new LinkedList<String>(); + + data.add("Available Exchanges"); + + for (AMQShortString queue : queues) + { + data.add(queue.toString()); + } + + return data; + } + + private java.util.List<String> listQueues(VirtualHost vhost, AMQShortString exchangeName) + { + return listQueues(vhost, vhost.getExchangeRegistry().getExchange(exchangeName)); + } + + private java.util.List<String> listQueues(VirtualHost vhost, Exchange exchange) + { + Collection<AMQQueue> queues = vhost.getQueueRegistry().getQueues(); + + if (queues == null || queues.size() == 0) + { + return null; + } + + java.util.List<String> data = new LinkedList<String>(); + + data.add("Available Queues"); + + for (AMQQueue queue : queues) + { + if (exchange != null) + { + if (exchange.isBound(queue)) + { + data.add(queue.getNameShortString().toString()); + } + } + else + { + data.add(queue.getNameShortString().toString()); + } + } + + if (exchange != null) + { + if (queues.size() == 1) + { + return null; + } + } + + return data; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java new file mode 100644 index 0000000000..244a311c30 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.configuration.Configuration; +import org.apache.qpid.tools.messagestore.MessageStoreTool; + +public class Load extends AbstractCommand +{ + public Load(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Loads specified broker configuration file."; + } + + public String usage() + { + return "load <configuration file>"; + } + + public String getCommand() + { + return "load"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals(getCommand()); + + if (args.length > 2) + { + _console.print("load " + args[1] + ": additional options not understood:"); + for (int i = 2; i < args.length; i++) + { + _console.print(args[i] + " "); + } + _console.println(""); + } + else if (args.length < 2) + { + _console.println("Enter Configuration file."); + String input = _console.readln(); + if (input != null) + { + doLoad(input); + } + else + { + _console.println("Did not recognise config file."); + } + } + else + { + doLoad(args[1]); + } + } + + private void doLoad(String configfile) + { + _console.println("Loading Configuration:" + configfile); + + try + { + _tool.setConfigurationFile(configfile); + } + catch (Configuration.InitException e) + { + _console.println("Unable to open config file due to: '" + e.getMessage() + "'"); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java new file mode 100644 index 0000000000..615f6ec1c2 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.tools.messagestore.MessageStoreTool; + +import java.util.LinkedList; +import java.util.List; + +public class Move extends AbstractCommand +{ + + public Move(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Move messages between queues.";/*\n" + + "The currently selected message set will be moved to the specifed queue.\n" + + "Alternatively the values can be provided on the command line.";*/ + } + + public String usage() + { + return "move to=<queue> [from=<queue>] [msgids=<msgids eg, 1,2,4-10>]"; + } + + public String getCommand() + { + return "move"; + } + + public void execute(String... args) + { + AMQQueue toQueue = null; + AMQQueue fromQueue = _tool.getState().getQueue(); + java.util.List<Long> msgids = _tool.getState().getMessages(); + + if (args.length >= 2) + { + for (String arg : args) + { + if (arg.startsWith("to=")) + { + String queueName = arg.substring(arg.indexOf("=") + 1); + toQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + } + + if (arg.startsWith("from=")) + { + String queueName = arg.substring(arg.indexOf("=") + 1); + fromQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + } + + if (arg.startsWith("msgids=")) + { + String msgidStr = arg.substring(arg.indexOf("=") + 1); + + // Record the current message selection + java.util.List<Long> currentIDs = _tool.getState().getMessages(); + + // Use the ToolState class to perform the messasge parsing + _tool.getState().setMessages(msgidStr); + msgids = _tool.getState().getMessages(); + + // Reset the original selection of messages + _tool.getState().setMessages(currentIDs); + } + } + } + + if (!checkRequirements(fromQueue, toQueue, msgids)) + { + return; + } + + processIDs(fromQueue, toQueue, msgids); + } + + private void processIDs(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids) + { + Long previous = null; + Long start = null; + + if (msgids == null) + { + msgids = allMessageIDs(fromQueue); + } + + if (msgids == null || msgids.size() == 0) + { + _console.println("No Messages to move."); + return; + } + + for (long id : msgids) + { + if (previous != null) + { + if (id == previous + 1) + { + if (start == null) + { + start = previous; + } + } + else + { + if (start != null) + { + //move a range of ids + doCommand(fromQueue, start, id, toQueue); + start = null; + } + else + { + //move a single id + doCommand(fromQueue, id, id, toQueue); + } + } + } + + previous = id; + } + + if (start != null) + { + //move a range of ids + doCommand(fromQueue, start, previous, toQueue); + } + } + + private List<Long> allMessageIDs(AMQQueue fromQueue) + { + List<Long> ids = new LinkedList<Long>(); + + if (fromQueue != null) + { + List<QueueEntry> messages = fromQueue.getMessagesOnTheQueue(); + if (messages != null) + { + for (QueueEntry msg : messages) + { + ids.add(msg.getMessage().getMessageNumber()); + } + } + } + + return ids; + } + + protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, List<Long> msgids) + { + if (toQueue == null) + { + _console.println("Destination queue not specifed."); + _console.println(usage()); + return false; + } + + if (fromQueue == null) + { + _console.println("Source queue not specifed."); + _console.println(usage()); + return false; + } + + return true; + } + + protected void doCommand(AMQQueue fromQueue, long start, long id, AMQQueue toQueue) + { + ServerTransaction txn = new LocalTransaction(fromQueue.getVirtualHost().getTransactionLog()); + fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getNameShortString().toString(), txn); + txn.commit(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java new file mode 100644 index 0000000000..8df4afa2db --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.tools.messagestore.MessageStoreTool; +import org.apache.qpid.server.queue.AMQQueue; + +public class Purge extends Move +{ + public Purge(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Purge messages from a queue.\n" + + "The currently selected message set will be purged from the specifed queue.\n" + + "Alternatively the values can be provided on the command line."; + } + + public String usage() + { + return "purge from=<queue> [msgids=<msgids eg, 1,2,4-10>]"; + } + + public String getCommand() + { + return "purge"; + } + + + protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids) + { + if (fromQueue == null) + { + _console.println("Source queue not specifed."); + _console.println(usage()); + return false; + } + + return true; + } + + protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue) + { + fromQueue.removeMessagesFromQueue(start, end); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java new file mode 100644 index 0000000000..a81bc07c38 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.tools.messagestore.MessageStoreTool; + +public class Quit extends AbstractCommand +{ + public Quit(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Quit the tool."; + } + + public String usage() + { + return "quit"; + } + + public String getCommand() + { + return "quit"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals("quit"); + + _tool.quit(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java new file mode 100644 index 0000000000..ff59568374 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.tools.messagestore.MessageStoreTool; + +import java.util.LinkedList; +import java.util.StringTokenizer; + +public class Select extends AbstractCommand +{ + + public Select(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Perform a selection."; + } + + public String usage() + { + return "select virtualhost <name> |exchange <name> |queue <name> | msg id=<msgids eg. 1,2,4-10>"; + } + + public String getCommand() + { + return "select"; + } + + public void execute(String... args) + { + assert args.length > 2; + assert args[0].equals("select"); + + if (args.length < 3) + { + if (args[1].equals("show")) + { + doSelect(args[1], null); + } + else + { + _console.print("select : unknown command:"); + _console.println(help()); + } + } + else + { + if (args[1].equals("virtualhost") + || args[1].equals("vhost") + || args[1].equals("exchange") + || args[1].equals("queue") + || args[1].equals("msg") + ) + { + doSelect(args[1], args[2]); + } + else + { + _console.println(help()); + } + } + } + + private void doSelect(String type, String item) + { + if (type.equals("virtualhost")) + { + + VirtualHost vhost = ApplicationRegistry.getInstance() + .getVirtualHostRegistry().getVirtualHost(item); + + if (vhost == null) + { + _console.println("Virtualhost '" + item + "' not found."); + } + else + { + _tool.getState().setVhost(vhost); + } + } + + if (type.equals("exchange")) + { + + VirtualHost vhost = _tool.getState().getVhost(); + + if (vhost == null) + { + _console.println("No Virtualhost open. Open a Virtualhost first."); + return; + } + + + Exchange exchange = vhost.getExchangeRegistry().getExchange(new AMQShortString(item)); + + if (exchange == null) + { + _console.println("Exchange '" + item + "' not found."); + } + else + { + _tool.getState().setExchange(exchange); + } + + if (_tool.getState().getQueue() != null) + { + if (!exchange.isBound(_tool.getState().getQueue())) + { + _tool.getState().setQueue(null); + } + } + } + + if (type.equals("queue")) + { + VirtualHost vhost = _tool.getState().getVhost(); + + if (vhost == null) + { + _console.println("No Virtualhost open. Open a Virtualhost first."); + return; + } + + AMQQueue queue = vhost.getQueueRegistry().getQueue(new AMQShortString(item)); + + if (queue == null) + { + _console.println("Queue '" + item + "' not found."); + } + else + { + _tool.getState().setQueue(queue); + + if (_tool.getState().getExchange() == null) + { + for (AMQShortString exchangeName : vhost.getExchangeRegistry().getExchangeNames()) + { + Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName); + if (exchange.isBound(queue)) + { + _tool.getState().setExchange(exchange); + break; + } + } + } + + //remove the message selection + _tool.getState().setMessages((java.util.List<Long>) null); + } + } + + if (type.equals("msg")) + { + if (item.startsWith("id=")) + { + StringTokenizer tok = new StringTokenizer(item.substring(item.indexOf("=") + 1), ","); + + java.util.List<Long> msgids = null; + + if (tok.hasMoreTokens()) + { + msgids = new LinkedList<Long>(); + } + + while (tok.hasMoreTokens()) + { + String next = tok.nextToken(); + if (next.contains("-")) + { + Long start = Long.parseLong(next.substring(0, next.indexOf("-"))); + Long end = Long.parseLong(next.substring(next.indexOf("-") + 1)); + + if (end >= start) + { + for (long l = start; l <= end; l++) + { + msgids.add(l); + } + } + } + else + { + msgids.add(Long.parseLong(next)); + } + } + + _tool.getState().setMessages(msgids); + } + + } + + if (type.equals("show")) + { + _console.println(_tool.getState().toString()); + if (_tool.getState().getMessages() != null) + { + _console.print("Msgs:"); + for (Long l : _tool.getState().getMessages()) + { + _console.print(" " + l); + } + _console.println(""); + } + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java new file mode 100644 index 0000000000..806e161bbc --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.tools.messagestore.MessageStoreTool; +import org.apache.qpid.tools.utils.Console; + +import java.util.LinkedList; +import java.util.List; + +public class Show extends AbstractCommand +{ + protected boolean _amqHeaders = false; + protected boolean _routing = false; + protected boolean _msgHeaders = false; + + public Show(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Shows the messages headers."; + } + + public String usage() + { + return getCommand() + " [show=[all],[msgheaders],[amqheaders],[routing]] [id=<msgid e.g. 1,2,4-10>]"; + } + + public String getCommand() + { + return "show"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals(getCommand()); + + if (args.length < 2) + { + parseArgs("all"); + } + else + { + parseArgs(args); + } + + performShow(); + } + + protected void parseArgs(String... args) + { + List<Long> msgids = null; + + if (args.length >= 2) + { + for (String arg : args) + { + if (arg.startsWith("show=")) + { + _msgHeaders = arg.contains("msgheaders") || arg.contains("all"); + _amqHeaders = arg.contains("amqheaders") || arg.contains("all"); + _routing = arg.contains("routing") || arg.contains("all"); + } + + if (arg.startsWith("id=")) + { + _tool.getState().setMessages(msgids); + } + }//for args + }// if args > 2 + } + + protected void performShow() + { + if (_tool.getState().getVhost() == null) + { + _console.println("No Virtualhost selected. 'DuSelect' a Virtualhost first."); + return; + } + + AMQQueue _queue = _tool.getState().getQueue(); + + List<Long> msgids = _tool.getState().getMessages(); + + if (_queue != null) + { + List<QueueEntry> messages = _queue.getMessagesOnTheQueue(); + if (messages == null || messages.size() == 0) + { + _console.println("No messages on queue"); + return; + } + + List<List> data = createMessageData(msgids, messages, _amqHeaders, _routing, _msgHeaders); + if (data != null) + { + _console.printMap(null, data); + } + else + { + String message = "No data to display."; + if (msgids != null) + { + message += " Is message selection correct? " + _tool.getState().printMessages(); + } + _console.println(message); + } + + } + else + { + _console.println("No Queue specified to show."); + } + } + + /** + * Create the list data for display from the messages. + * + * @param msgids The list of message ids to display + * @param messages A list of messages to format and display. + * @param showHeaders should the header info be shown + * @param showRouting show the routing info be shown + * @param showMessageHeaders show the msg headers be shown + * @return the formated data lists for printing + */ + protected List<List> createMessageData(List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting, + boolean showMessageHeaders) + { + + // Currenly exposed message properties +// //Printing the content Body +// msg.getContentBodyIterator(); +// //Print the Headers +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getAppId(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getAppIdAsString(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getClusterId(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getContentType(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getCorrelationId(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getDeliveryMode(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getEncoding(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getExpiration(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getMessageNumber(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPriority(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPropertyFlags(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getReplyTo(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getTimestamp(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getType(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getUserId(); +// +// //Print out all the property names +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders().getPropertyNames(); +// +// msg.getMessageNumber(); +// msg.getSize(); +// msg.getArrivalTime(); + +// msg.getDeliveredSubscription(); +// msg.getDeliveredToConsumer(); +// msg.getMessageHandle(); +// msg.getMessageNumber(); +// msg.getMessagePublishInfo(); +// msg.getPublisher(); + +// msg.getStoreContext(); +// msg.isAllContentReceived(); +// msg.isPersistent(); +// msg.isRedelivered(); +// msg.isRejectedBy(); +// msg.isTaken(); + + //Header setup + + List<List> data = new LinkedList<List>(); + + List<String> id = new LinkedList<String>(); + data.add(id); + id.add(Columns.ID.name()); + id.add(Console.ROW_DIVIDER); + + List<String> exchange = new LinkedList<String>(); + List<String> routingkey = new LinkedList<String>(); + List<String> immediate = new LinkedList<String>(); + List<String> mandatory = new LinkedList<String>(); + if (showRouting) + { + data.add(exchange); + exchange.add(Columns.Exchange.name()); + exchange.add(Console.ROW_DIVIDER); + + data.add(routingkey); + routingkey.add(Columns.RoutingKey.name()); + routingkey.add(Console.ROW_DIVIDER); + + data.add(immediate); + immediate.add(Columns.isImmediate.name()); + immediate.add(Console.ROW_DIVIDER); + + data.add(mandatory); + mandatory.add(Columns.isMandatory.name()); + mandatory.add(Console.ROW_DIVIDER); + } + + List<String> size = new LinkedList<String>(); + List<String> appid = new LinkedList<String>(); + List<String> clusterid = new LinkedList<String>(); + List<String> contenttype = new LinkedList<String>(); + List<String> correlationid = new LinkedList<String>(); + List<String> deliverymode = new LinkedList<String>(); + List<String> encoding = new LinkedList<String>(); + List<String> arrival = new LinkedList<String>(); + List<String> expiration = new LinkedList<String>(); + List<String> priority = new LinkedList<String>(); + List<String> propertyflag = new LinkedList<String>(); + List<String> replyto = new LinkedList<String>(); + List<String> timestamp = new LinkedList<String>(); + List<String> type = new LinkedList<String>(); + List<String> userid = new LinkedList<String>(); + List<String> ispersitent = new LinkedList<String>(); + List<String> isredelivered = new LinkedList<String>(); + List<String> isdelivered = new LinkedList<String>(); + + data.add(size); + size.add(Columns.Size.name()); + size.add(Console.ROW_DIVIDER); + + if (showHeaders) + { + data.add(ispersitent); + ispersitent.add(Columns.isPersistent.name()); + ispersitent.add(Console.ROW_DIVIDER); + + data.add(isredelivered); + isredelivered.add(Columns.isRedelivered.name()); + isredelivered.add(Console.ROW_DIVIDER); + + data.add(isdelivered); + isdelivered.add(Columns.isDelivered.name()); + isdelivered.add(Console.ROW_DIVIDER); + + data.add(appid); + appid.add(Columns.App_ID.name()); + appid.add(Console.ROW_DIVIDER); + + data.add(clusterid); + clusterid.add(Columns.Cluster_ID.name()); + clusterid.add(Console.ROW_DIVIDER); + + data.add(contenttype); + contenttype.add(Columns.Content_Type.name()); + contenttype.add(Console.ROW_DIVIDER); + + data.add(correlationid); + correlationid.add(Columns.Correlation_ID.name()); + correlationid.add(Console.ROW_DIVIDER); + + data.add(deliverymode); + deliverymode.add(Columns.Delivery_Mode.name()); + deliverymode.add(Console.ROW_DIVIDER); + + data.add(encoding); + encoding.add(Columns.Encoding.name()); + encoding.add(Console.ROW_DIVIDER); + + data.add(arrival); + expiration.add(Columns.Arrival.name()); + expiration.add(Console.ROW_DIVIDER); + + data.add(expiration); + expiration.add(Columns.Expiration.name()); + expiration.add(Console.ROW_DIVIDER); + + data.add(priority); + priority.add(Columns.Priority.name()); + priority.add(Console.ROW_DIVIDER); + + data.add(propertyflag); + propertyflag.add(Columns.Property_Flag.name()); + propertyflag.add(Console.ROW_DIVIDER); + + data.add(replyto); + replyto.add(Columns.ReplyTo.name()); + replyto.add(Console.ROW_DIVIDER); + + data.add(timestamp); + timestamp.add(Columns.Timestamp.name()); + timestamp.add(Console.ROW_DIVIDER); + + data.add(type); + type.add(Columns.Type.name()); + type.add(Console.ROW_DIVIDER); + + data.add(userid); + userid.add(Columns.UserID.name()); + userid.add(Console.ROW_DIVIDER); + } + + List<String> msgHeaders = new LinkedList<String>(); + if (showMessageHeaders) + { + data.add(msgHeaders); + msgHeaders.add(Columns.MsgHeaders.name()); + msgHeaders.add(Console.ROW_DIVIDER); + } + + //Add create the table of data + for (QueueEntry entry : messages) + { + ServerMessage msg = entry.getMessage(); + if (!includeMsg(msg, msgids)) + { + continue; + } + + id.add(msg.getMessageNumber().toString()); + + size.add("" + msg.getSize()); + + arrival.add("" + msg.getArrivalTime()); + + ispersitent.add(msg.isPersistent() ? "true" : "false"); + + + isredelivered.add(entry.isRedelivered() ? "true" : "false"); + + isdelivered.add(entry.getDeliveredToConsumer() ? "true" : "false"); + +// msg.getMessageHandle(); + + BasicContentHeaderProperties headers = null; + + try + { + if(msg instanceof AMQMessage) + { + headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().getProperties()); + } + } + catch (AMQException e) + { + //ignore +// commandError("Unable to read properties for message: " + e.getMessage(), null); + } + + if (headers != null) + { + String appidS = headers.getAppIdAsString(); + appid.add(appidS == null ? "null" : appidS); + + String clusterS = headers.getClusterIdAsString(); + clusterid.add(clusterS == null ? "null" : clusterS); + + String contentS = headers.getContentTypeAsString(); + contenttype.add(contentS == null ? "null" : contentS); + + String correlationS = headers.getCorrelationIdAsString(); + correlationid.add(correlationS == null ? "null" : correlationS); + + deliverymode.add("" + headers.getDeliveryMode()); + + AMQShortString encodeSS = headers.getEncoding(); + encoding.add(encodeSS == null ? "null" : encodeSS.toString()); + + expiration.add("" + headers.getExpiration()); + + FieldTable headerFT = headers.getHeaders(); + msgHeaders.add(headerFT == null ? "none" : "" + headerFT.toString()); + + priority.add("" + headers.getPriority()); + propertyflag.add("" + headers.getPropertyFlags()); + + AMQShortString replytoSS = headers.getReplyTo(); + replyto.add(replytoSS == null ? "null" : replytoSS.toString()); + + timestamp.add("" + headers.getTimestamp()); + + AMQShortString typeSS = headers.getType(); + type.add(typeSS == null ? "null" : typeSS.toString()); + + AMQShortString useridSS = headers.getUserId(); + userid.add(useridSS == null ? "null" : useridSS.toString()); + + MessagePublishInfo info = null; + try + { + if(msg instanceof AMQMessage) + { + info = ((AMQMessage)msg).getMessagePublishInfo(); + } + + } + catch (AMQException e) + { + //ignore + } + + if (info != null) + { + AMQShortString exchangeSS = info.getExchange(); + exchange.add(exchangeSS == null ? "null" : exchangeSS.toString()); + + AMQShortString routingkeySS = info.getRoutingKey(); + routingkey.add(routingkeySS == null ? "null" : routingkeySS.toString()); + + immediate.add(info.isImmediate() ? "true" : "false"); + mandatory.add(info.isMandatory() ? "true" : "false"); + } + +// msg.getPublisher(); -- only used in clustering +// msg.getStoreContext(); +// msg.isAllContentReceived(); + + }// if headers!=null + +// need to access internal map and do lookups. +// msg.isTaken(); +// msg.getDeliveredSubscription(); +// msg.isRejectedBy(); + + } + + // if id only had the header and the divider in it then we have no data to display + if (id.size() == 2) + { + return null; + } + return data; + } + + protected boolean includeMsg(ServerMessage msg, List<Long> msgids) + { + if (msgids == null) + { + return true; + } + + Long msgid = msg.getMessageNumber(); + + boolean found = false; + + if (msgids != null) + { + //check msgid is in msgids + for (Long l : msgids) + { + if (l.equals(msgid)) + { + found = true; + break; + } + } + } + return found; + } + + public enum Columns + { + ID, + Size, + Exchange, + RoutingKey, + isImmediate, + isMandatory, + isPersistent, + isRedelivered, + isDelivered, + App_ID, + Cluster_ID, + Content_Type, + Correlation_ID, + Delivery_Mode, + Encoding, + Arrival, + Expiration, + Priority, + Property_Flag, + ReplyTo, + Timestamp, + Type, + UserID, + MsgHeaders + } +} + + |