diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java | 70 |
1 files changed, 67 insertions, 3 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index f10a4175db..74c034d305 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -25,10 +25,14 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.Transaction; import org.apache.log4j.Logger; import java.util.Set; import java.util.HashSet; +import java.util.List; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.CopyOnWriteArraySet; @@ -213,12 +217,21 @@ public class QueueEntryImpl implements QueueEntry public void release() { _stateUpdater.set(this,AVAILABLE_STATE); - getQueue().requeue(this); - if(_stateChangeListeners != null) + if(!getQueue().isDeleted()) { - notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); + getQueue().requeue(this); + if(_stateChangeListeners != null) + { + notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); + } + + } + else if(acquire()) + { + routeToAlternate(); } + } public boolean releaseButRetain() @@ -386,6 +399,57 @@ public class QueueEntryImpl implements QueueEntry dispose(); } + public void routeToAlternate() + { + final AMQQueue currentQueue = getQueue(); + Exchange alternateExchange = currentQueue.getAlternateExchange(); + + if(alternateExchange != null) + { + final List<AMQQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this)); + final ServerMessage message = getMessage(); + if(rerouteQueues != null && rerouteQueues.size() != 0) + { + Transaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog()); + + txn.enqueue(rerouteQueues, message, new Transaction.Action() { + public void postCommit() + { + try + { + for(AMQQueue queue : rerouteQueues) + { + QueueEntry entry = queue.enqueue(message); + } + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + + public void onRollback() + { + + } + }); + txn.dequeue(currentQueue,message, + new Transaction.Action() + { + public void postCommit() + { + discard(); + } + + public void onRollback() + { + + } + }); + } + } + } + public boolean isQueueDeleted() { return getQueue().isDeleted(); |