summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java70
1 files changed, 67 insertions, 3 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index f10a4175db..74c034d305 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -25,10 +25,14 @@ import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.Transaction;
import org.apache.log4j.Logger;
import java.util.Set;
import java.util.HashSet;
+import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -213,12 +217,21 @@ public class QueueEntryImpl implements QueueEntry
public void release()
{
_stateUpdater.set(this,AVAILABLE_STATE);
- getQueue().requeue(this);
- if(_stateChangeListeners != null)
+ if(!getQueue().isDeleted())
{
- notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ getQueue().requeue(this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ }
+
+ }
+ else if(acquire())
+ {
+ routeToAlternate();
}
+
}
public boolean releaseButRetain()
@@ -386,6 +399,57 @@ public class QueueEntryImpl implements QueueEntry
dispose();
}
+ public void routeToAlternate()
+ {
+ final AMQQueue currentQueue = getQueue();
+ Exchange alternateExchange = currentQueue.getAlternateExchange();
+
+ if(alternateExchange != null)
+ {
+ final List<AMQQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+ final ServerMessage message = getMessage();
+ if(rerouteQueues != null && rerouteQueues.size() != 0)
+ {
+ Transaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+
+ txn.enqueue(rerouteQueues, message, new Transaction.Action() {
+ public void postCommit()
+ {
+ try
+ {
+ for(AMQQueue queue : rerouteQueues)
+ {
+ QueueEntry entry = queue.enqueue(message);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ txn.dequeue(currentQueue,message,
+ new Transaction.Action()
+ {
+ public void postCommit()
+ {
+ discard();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+ }
+ }
+
public boolean isQueueDeleted()
{
return getQueue().isDeleted();