diff options
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r-- | java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java | 67 |
1 files changed, 22 insertions, 45 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index ed61f1acf6..461d493437 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -34,7 +34,6 @@ import org.apache.qpid.server.txn.ServerTransaction; import java.util.EnumMap; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -250,7 +249,7 @@ public abstract class QueueEntryImpl implements QueueEntry } else if(acquire()) { - routeToAlternate(); + routeToAlternate(null, null); } } @@ -368,65 +367,43 @@ public abstract class QueueEntryImpl implements QueueEntry dispose(); } - public void routeToAlternate() + public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn) { final AMQQueue currentQueue = getQueue(); Exchange alternateExchange = currentQueue.getAlternateExchange(); - + boolean autocommit = txn == null; if (alternateExchange != null) { - List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), getInstanceProperties()); - final ServerMessage message = getMessage(); - if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null) + if(autocommit) { - queues = alternateExchange.getAlternateExchange().route(getMessage(), getInstanceProperties()); + txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore()); } + int enqueues = alternateExchange.send(getMessage(), getInstanceProperties(), txn, action); - - if (queues != null && queues.size() != 0) + txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action() { - final List<? extends BaseQueue> rerouteQueues = queues; - ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore()); - - txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() + public void postCommit() { - public void postCommit() - { - try - { - for (BaseQueue queue : rerouteQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - public void onRollback() - { - - } - }); - - txn.dequeue(currentQueue, message, new ServerTransaction.Action() - { - public void postCommit() - { - delete(); - } + delete(); + } - public void onRollback() - { + public void onRollback() + { - } - }); + } + }); + if(autocommit) + { txn.commit(); } + return enqueues; + + } + else + { + return 0; } } |