diff options
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 88 |
1 files changed, 10 insertions, 78 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index d63d1946d3..87d11a892e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1355,93 +1355,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); - if(_alternateExchange != null) + + for(final QueueEntry entry : entries) { + // TODO log requeues with a post enqueue action + int requeues = entry.routeToAlternate(null, txn); - for(final QueueEntry entry : entries) + if(requeues == 0) { - - List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), entry.getInstanceProperties()); - if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null) - { - queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(), entry.getInstanceProperties()); - } - - final ServerMessage message = entry.getMessage(); - if(queues != null && queues.size() != 0) - { - final List<? extends BaseQueue> rerouteQueues = queues; - txn.enqueue(rerouteQueues, entry.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - try - { - for(BaseQueue queue : rerouteQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - - } - - public void onRollback() - { - - } - }); - txn.dequeue(this, entry.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - entry.delete(); - } - - public void onRollback() - { - } - }); - } - + // TODO log discard } - - _alternateExchange.removeReference(this); } - else - { - // TODO log discard - - for(final QueueEntry entry : entries) - { - final ServerMessage message = entry.getMessage(); - if(message != null) - { - txn.dequeue(this, message, - new ServerTransaction.Action() - { - public void postCommit() - { - entry.delete(); - } + txn.commit(); - public void onRollback() - { - } - }); - } - } + if(_alternateExchange != null) + { + _alternateExchange.removeReference(this); } - txn.commit(); for (Task task : _deleteTaskList) { |