summaryrefslogtreecommitdiff
path: root/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java88
1 files changed, 10 insertions, 78 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index d63d1946d3..87d11a892e 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1355,93 +1355,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
- if(_alternateExchange != null)
+
+ for(final QueueEntry entry : entries)
{
+ // TODO log requeues with a post enqueue action
+ int requeues = entry.routeToAlternate(null, txn);
- for(final QueueEntry entry : entries)
+ if(requeues == 0)
{
-
- List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), entry.getInstanceProperties());
- if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
- {
- queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(), entry.getInstanceProperties());
- }
-
- final ServerMessage message = entry.getMessage();
- if(queues != null && queues.size() != 0)
- {
- final List<? extends BaseQueue> rerouteQueues = queues;
- txn.enqueue(rerouteQueues, entry.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- try
- {
- for(BaseQueue queue : rerouteQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- public void onRollback()
- {
-
- }
- });
- txn.dequeue(this, entry.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- entry.delete();
- }
-
- public void onRollback()
- {
- }
- });
- }
-
+ // TODO log discard
}
-
- _alternateExchange.removeReference(this);
}
- else
- {
- // TODO log discard
-
- for(final QueueEntry entry : entries)
- {
- final ServerMessage message = entry.getMessage();
- if(message != null)
- {
- txn.dequeue(this, message,
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- entry.delete();
- }
+ txn.commit();
- public void onRollback()
- {
- }
- });
- }
- }
+ if(_alternateExchange != null)
+ {
+ _alternateExchange.removeReference(this);
}
- txn.commit();
for (Task task : _deleteTaskList)
{