diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-07-17 08:35:35 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-07-17 08:35:35 +0000 |
commit | bc77d792e85c348163d24a609f05f3772a7ae950 (patch) | |
tree | de1542dfb4d0652df4082c6620c6ff6d3972fc7f | |
parent | 8a4f51a91860724aefe63049268567e99d822ddd (diff) | |
download | qpid-python-bc77d792e85c348163d24a609f05f3772a7ae950.tar.gz |
Update to the MessageStore Tool to provide Move and Purge functionality.
Updated to remove the AMQExceptions that will be removed from the Exchange class.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@556847 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 247 insertions, 138 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java index 831cb90a4d..96ecb36952 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java @@ -20,20 +20,12 @@ */ package org.apache.qpid.tools.messagestore.commands; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.tools.messagestore.MessageStoreTool; -public class Copy extends AbstractCommand +public class Copy extends Move { - - /** - * Since the Coopy command is not associated with a real channel we can safely create our own store context - * for use in the few methods that require one. - */ - private StoreContext _storeContext = new StoreContext(); - public Copy(MessageStoreTool tool) { super(tool); @@ -56,94 +48,9 @@ public class Copy extends AbstractCommand return "copy"; } - public void execute(String... args) + protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue, StoreContext storeContext) { - AMQQueue toQueue = null; - AMQQueue fromQueue = _tool.getState().getQueue(); - java.util.List<Long> msgids = _tool.getState().getMessages(); - - if (args.length >= 2) - { - for (String arg : args) - { - if (arg.startsWith("to=")) - { - String queueName = arg.substring(arg.indexOf("=") + 1); - toQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - } - - if (arg.startsWith("from=")) - { - String queueName = arg.substring(arg.indexOf("=") + 1); - fromQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName)); - } - - if (arg.startsWith("msgids=")) - { - String msgidStr = arg.substring(arg.indexOf("=") + 1); - - // Record the current message selection - java.util.List<Long> currentIDs = _tool.getState().getMessages(); - - // Use the ToolState class to perform the messasge parsing - _tool.getState().setMessages(msgidStr); - msgids = _tool.getState().getMessages(); - - // Reset the original selection of messages - _tool.getState().setMessages(currentIDs); - } - } - } - - if (toQueue == null) - { - _console.println("Queue to copy to not specifed."); - _console.println(usage()); - return; - } - - if (fromQueue == null) - { - _console.println("Queue to copy from not specifed."); - _console.println(usage()); - return; - } - - performCopy(fromQueue, toQueue, msgids); + fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getName().toString(), storeContext); } - protected void performCopy(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids) - { - Long previous = null; - Long start = null; - - for (long id : msgids) - { - if (previous != null) - { - if (id == previous + 1) - { - if (start == null) - { - start = previous; - } - } - else - { - if (start != null) - { - //move a range of ids - fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), _storeContext); - } - else - { - //move a single id - fromQueue.moveMessagesToAnotherQueue(id, id, toQueue.getName().toString(), _storeContext); - } - } - } - - previous = id; - } - } } diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java index 8a42718542..df8b59ec19 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.tools.messagestore.commands; -import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; @@ -234,17 +233,9 @@ public class List extends AbstractCommand { if (exchange != null) { - try + if (exchange.isBound(queue)) { - if (exchange.isBound(queue)) - { - data.add(queue.toString()); - } - } - catch (AMQException e) - { - // is never thrown by current impls forced to throw by interface. - commandError("Unable to check exchange bindings: " + e.getMessage(), null); + data.add(queue.toString()); } } else @@ -299,17 +290,9 @@ public class List extends AbstractCommand { if (exchange != null) { - try - { - if (exchange.isBound(queue)) - { - data.add(queue.getName().toString()); - } - } - catch (AMQException e) + if (exchange.isBound(queue)) { - // is never thrown by current impls forced to throw by interface. - commandError("Unable to check exchange bindings: " + e.getMessage(), null); + data.add(queue.getName().toString()); } } else diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java new file mode 100644 index 0000000000..a9497fd23e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.tools.messagestore.MessageStoreTool; + +import java.util.List; + +public class Move extends AbstractCommand +{ + + /** + * Since the Coopy command is not associated with a real channel we can safely create our own store context + * for use in the few methods that require one. + */ + private StoreContext _storeContext = new StoreContext(); + + public Move(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Move messages between queues.\n" + + "The currently selected message set will be moved to the specifed queue.\n" + + "Alternatively the values can be provided on the command line."; + } + + public String usage() + { + return "move to=<queue> [from=<queue>] [msgids=<msgids eg, 1,2,4-10>]"; + } + + public String getCommand() + { + return "move"; + } + + public void execute(String... args) + { + AMQQueue toQueue = null; + AMQQueue fromQueue = _tool.getState().getQueue(); + java.util.List<Long> msgids = _tool.getState().getMessages(); + + if (args.length >= 2) + { + for (String arg : args) + { + if (arg.startsWith("to=")) + { + String queueName = arg.substring(arg.indexOf("=") + 1); + toQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + } + + if (arg.startsWith("from=")) + { + String queueName = arg.substring(arg.indexOf("=") + 1); + fromQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + } + + if (arg.startsWith("msgids=")) + { + String msgidStr = arg.substring(arg.indexOf("=") + 1); + + // Record the current message selection + java.util.List<Long> currentIDs = _tool.getState().getMessages(); + + // Use the ToolState class to perform the messasge parsing + _tool.getState().setMessages(msgidStr); + msgids = _tool.getState().getMessages(); + + // Reset the original selection of messages + _tool.getState().setMessages(currentIDs); + } + } + } + + if (!checkRequirements(fromQueue, toQueue, msgids)) + { + return; + } + + processIDs(fromQueue, toQueue, msgids); + } + + private void processIDs(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids) + { + Long previous = null; + Long start = null; + + for (long id : msgids) + { + if (previous != null) + { + if (id == previous + 1) + { + if (start == null) + { + start = previous; + } + } + else + { + if (start != null) + { + //move a range of ids + doCommand(fromQueue, start, id, toQueue, _storeContext); + } + else + { + //move a single id + doCommand(fromQueue, id, id, toQueue, _storeContext); + } + } + } + + previous = id; + } + } + + protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, List<Long> msgids) + { + if (toQueue == null) + { + _console.println("Destination queue not specifed."); + _console.println(usage()); + return false; + } + + if (fromQueue == null) + { + _console.println("Source queue not specifed."); + _console.println(usage()); + return false; + } + + return true; + } + + protected void doCommand(AMQQueue fromQueue, long start, long id, AMQQueue toQueue, StoreContext storeContext) + { + fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), _storeContext); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java new file mode 100644 index 0000000000..7154159b40 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.tools.messagestore.MessageStoreTool; + +public class Purge extends Move +{ + public Purge(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Purge messages from a queue.\n" + + "The currently selected message set will be purged from the specifed queue.\n" + + "Alternatively the values can be provided on the command line."; + } + + public String usage() + { + return "purge from=<queue> [msgids=<msgids eg, 1,2,4-10>]"; + } + + public String getCommand() + { + return "purge"; + } + + + protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids) + { + if (fromQueue == null) + { + _console.println("Source queue not specifed."); + _console.println(usage()); + return false; + } + + return true; + } + + protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue, StoreContext storeContext) + { + fromQueue.removeMessagesFromQueue(start, end, storeContext); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java index 39f35da912..5e9b7028e9 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.tools.messagestore.commands; -import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; @@ -132,16 +131,9 @@ public class Select extends AbstractCommand if (_tool.getState().getQueue() != null) { - try + if (!exchange.isBound(_tool.getState().getQueue())) { - if (!exchange.isBound(_tool.getState().getQueue())) - { - _tool.getState().setQueue(null); - } - } - catch (AMQException e) - { - //ignore + _tool.getState().setQueue(null); } } } @@ -170,18 +162,11 @@ public class Select extends AbstractCommand { for (AMQShortString exchangeName : vhost.getExchangeRegistry().getExchangeNames()) { - try - { - Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName); - if (exchange.isBound(queue)) - { - _tool.getState().setExchange(exchange); - break; - } - } - catch (AMQException e) + Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName); + if (exchange.isBound(queue)) { - //ignore error + _tool.getState().setExchange(exchange); + break; } } } |