diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java | 179 |
1 files changed, 85 insertions, 94 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 5b57e40a82..f1e50427b1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -20,8 +20,14 @@ */ package org.apache.qpid.server.queue; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.AMQMessageHeader; @@ -31,23 +37,11 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - - -public class QueueEntryImpl implements QueueEntry +public abstract class QueueEntryImpl implements QueueEntry { - - /** - * Used for debugging purposes. - */ private static final Logger _log = Logger.getLogger(QueueEntryImpl.class); - private final SimpleQueueEntryList _queueEntryList; + private final QueueEntryList _queueEntryList; private MessageReference _message; @@ -80,22 +74,26 @@ public class QueueEntryImpl implements QueueEntry private volatile long _entryId; - volatile QueueEntryImpl _next; - private static final int DELIVERED_TO_CONSUMER = 1; private static final int REDELIVERED = 2; private volatile int _deliveryState; + /** Number of times this message has been delivered */ + private volatile int _deliveryCount = 0; + private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater + .newUpdater(QueueEntryImpl.class, "_deliveryCount"); + - QueueEntryImpl(SimpleQueueEntryList queueEntryList) + + public QueueEntryImpl(QueueEntryList<?> queueEntryList) { this(queueEntryList,null,Long.MIN_VALUE); _state = DELETED_STATE; } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId) + public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message, final long entryId) { _queueEntryList = queueEntryList; @@ -104,7 +102,7 @@ public class QueueEntryImpl implements QueueEntry _entryIdUpdater.set(this, entryId); } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message) + public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message) { _queueEntryList = queueEntryList; _message = message == null ? null : message.newReference(); @@ -233,8 +231,13 @@ public class QueueEntryImpl implements QueueEntry if(state instanceof SubscriptionAcquiredState) { getQueue().decrementUnackedMsgCount(); + Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription(); + if (subscription != null) + { + subscription.releaseQueueEntry(this); + } } - + if(!getQueue().isDeleted()) { getQueue().requeue(this); @@ -311,16 +314,15 @@ public class QueueEntryImpl implements QueueEntry public Subscription getDeliveredSubscription() { - EntryState state = _state; - if (state instanceof SubscriptionAcquiredState) - { - return ((SubscriptionAcquiredState) state).getSubscription(); - } - else - { - return null; - } - + EntryState state = _state; + if (state instanceof SubscriptionAcquiredState) + { + return ((SubscriptionAcquiredState) state).getSubscription(); + } + else + { + return null; + } } public void reject() @@ -409,50 +411,51 @@ public class QueueEntryImpl implements QueueEntry public void routeToAlternate() { final AMQQueue currentQueue = getQueue(); - Exchange alternateExchange = currentQueue.getAlternateExchange(); + Exchange alternateExchange = currentQueue.getAlternateExchange(); - if(alternateExchange != null) + if (alternateExchange != null) + { + final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this)); + final ServerMessage message = getMessage(); + if (rerouteQueues != null && rerouteQueues.size() != 0) { - final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this)); - final ServerMessage message = getMessage(); - if(rerouteQueues != null && rerouteQueues.size() != 0) - { - ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog()); - txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() { - public void postCommit() + ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog()); + + txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() + { + public void postCommit() + { + try { - try + for (BaseQueue queue : rerouteQueues) { - for(BaseQueue queue : rerouteQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) - { - throw new RuntimeException(e); + queue.enqueue(message); } } - - public void onRollback() + catch (AMQException e) { - + throw new RuntimeException(e); } - }); - txn.dequeue(currentQueue,message, - new ServerTransaction.Action() - { - public void postCommit() - { - discard(); - } - - public void onRollback() - { - - } - }); + } + + public void onRollback() + { + + } + }); + txn.dequeue(currentQueue, message, new ServerTransaction.Action() + { + public void postCommit() + { + discard(); + } + + public void onRollback() + { + + } + }); } } } @@ -492,33 +495,6 @@ public class QueueEntryImpl implements QueueEntry return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0; } - public QueueEntryImpl getNext() - { - - QueueEntryImpl next = nextNode(); - while(next != null && next.isDispensed() ) - { - - final QueueEntryImpl newNext = next.nextNode(); - if(newNext != null) - { - SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext); - next = nextNode(); - } - else - { - next = null; - } - - } - return next; - } - - QueueEntryImpl nextNode() - { - return _next; - } - public boolean isDeleted() { return _state == DELETED_STATE; @@ -530,7 +506,7 @@ public class QueueEntryImpl implements QueueEntry if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) { - _queueEntryList.advanceHead(); + _queueEntryList.entryDeleted(this); return true; } else @@ -554,4 +530,19 @@ public class QueueEntryImpl implements QueueEntry return _state.isDispensed(); } + public int getDeliveryCount() + { + return _deliveryCount; + } + + public void incrementDeliveryCount() + { + _deliveryCountUpdater.incrementAndGet(this); + } + + public void decrementDeliveryCount() + { + _deliveryCountUpdater.decrementAndGet(this); + } + } |