diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java | 17 |
1 files changed, 13 insertions, 4 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index fea5303e23..209553e8fa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -227,9 +227,10 @@ public abstract class QueueEntryImpl implements QueueEntry public void release() { EntryState state = _state; - + if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE)) { + if(state instanceof SubscriptionAcquiredState) { getQueue().decrementUnackedMsgCount(); @@ -254,6 +255,7 @@ public abstract class QueueEntryImpl implements QueueEntry routeToAlternate(); } } + } public boolean releaseButRetain() @@ -267,7 +269,6 @@ public abstract class QueueEntryImpl implements QueueEntry Subscription sub = ((SubscriptionAcquiredState) state).getSubscription(); if(_stateUpdater.compareAndSet(this, state, sub.getAssignedState())) { - System.err.println("Message released (and retained)" + getMessage().getMessageNumber()); getQueue().requeue(this); if(_stateChangeListeners != null) { @@ -417,11 +418,19 @@ public abstract class QueueEntryImpl implements QueueEntry if (alternateExchange != null) { - final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this)); + InboundMessageAdapter inboundMessageAdapter = new InboundMessageAdapter(this); + List<? extends BaseQueue> queues = alternateExchange.route(inboundMessageAdapter); final ServerMessage message = getMessage(); - if (rerouteQueues != null && rerouteQueues.size() != 0) + if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null) { + queues = alternateExchange.getAlternateExchange().route(inboundMessageAdapter); + } + + + if (queues != null && queues.size() != 0) + { + final List<? extends BaseQueue> rerouteQueues = queues; ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore()); txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() |