summaryrefslogtreecommitdiff
path: root/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java67
1 files changed, 22 insertions, 45 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index ed61f1acf6..461d493437 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -34,7 +34,6 @@ import org.apache.qpid.server.txn.ServerTransaction;
import java.util.EnumMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -250,7 +249,7 @@ public abstract class QueueEntryImpl implements QueueEntry
}
else if(acquire())
{
- routeToAlternate();
+ routeToAlternate(null, null);
}
}
@@ -368,65 +367,43 @@ public abstract class QueueEntryImpl implements QueueEntry
dispose();
}
- public void routeToAlternate()
+ public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
-
+ boolean autocommit = txn == null;
if (alternateExchange != null)
{
- List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), getInstanceProperties());
- final ServerMessage message = getMessage();
- if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null)
+ if(autocommit)
{
- queues = alternateExchange.getAlternateExchange().route(getMessage(), getInstanceProperties());
+ txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
}
+ int enqueues = alternateExchange.send(getMessage(), getInstanceProperties(), txn, action);
-
- if (queues != null && queues.size() != 0)
+ txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action()
{
- final List<? extends BaseQueue> rerouteQueues = queues;
- ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
-
- txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
+ public void postCommit()
{
- public void postCommit()
- {
- try
- {
- for (BaseQueue queue : rerouteQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void onRollback()
- {
-
- }
- });
-
- txn.dequeue(currentQueue, message, new ServerTransaction.Action()
- {
- public void postCommit()
- {
- delete();
- }
+ delete();
+ }
- public void onRollback()
- {
+ public void onRollback()
+ {
- }
- });
+ }
+ });
+ if(autocommit)
+ {
txn.commit();
}
+ return enqueues;
+
+ }
+ else
+ {
+ return 0;
}
}