summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java179
1 files changed, 85 insertions, 94 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 5b57e40a82..f1e50427b1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -20,8 +20,14 @@
*/
package org.apache.qpid.server.queue;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.AMQMessageHeader;
@@ -31,23 +37,11 @@ import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-
-public class QueueEntryImpl implements QueueEntry
+public abstract class QueueEntryImpl implements QueueEntry
{
-
- /**
- * Used for debugging purposes.
- */
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
- private final SimpleQueueEntryList _queueEntryList;
+ private final QueueEntryList _queueEntryList;
private MessageReference _message;
@@ -80,22 +74,26 @@ public class QueueEntryImpl implements QueueEntry
private volatile long _entryId;
- volatile QueueEntryImpl _next;
-
private static final int DELIVERED_TO_CONSUMER = 1;
private static final int REDELIVERED = 2;
private volatile int _deliveryState;
+ /** Number of times this message has been delivered */
+ private volatile int _deliveryCount = 0;
+ private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater
+ .newUpdater(QueueEntryImpl.class, "_deliveryCount");
+
- QueueEntryImpl(SimpleQueueEntryList queueEntryList)
+
+ public QueueEntryImpl(QueueEntryList<?> queueEntryList)
{
this(queueEntryList,null,Long.MIN_VALUE);
_state = DELETED_STATE;
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
+ public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message, final long entryId)
{
_queueEntryList = queueEntryList;
@@ -104,7 +102,7 @@ public class QueueEntryImpl implements QueueEntry
_entryIdUpdater.set(this, entryId);
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message)
+ public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
_message = message == null ? null : message.newReference();
@@ -233,8 +231,13 @@ public class QueueEntryImpl implements QueueEntry
if(state instanceof SubscriptionAcquiredState)
{
getQueue().decrementUnackedMsgCount();
+ Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription();
+ if (subscription != null)
+ {
+ subscription.releaseQueueEntry(this);
+ }
}
-
+
if(!getQueue().isDeleted())
{
getQueue().requeue(this);
@@ -311,16 +314,15 @@ public class QueueEntryImpl implements QueueEntry
public Subscription getDeliveredSubscription()
{
- EntryState state = _state;
- if (state instanceof SubscriptionAcquiredState)
- {
- return ((SubscriptionAcquiredState) state).getSubscription();
- }
- else
- {
- return null;
- }
-
+ EntryState state = _state;
+ if (state instanceof SubscriptionAcquiredState)
+ {
+ return ((SubscriptionAcquiredState) state).getSubscription();
+ }
+ else
+ {
+ return null;
+ }
}
public void reject()
@@ -409,50 +411,51 @@ public class QueueEntryImpl implements QueueEntry
public void routeToAlternate()
{
final AMQQueue currentQueue = getQueue();
- Exchange alternateExchange = currentQueue.getAlternateExchange();
+ Exchange alternateExchange = currentQueue.getAlternateExchange();
- if(alternateExchange != null)
+ if (alternateExchange != null)
+ {
+ final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+ final ServerMessage message = getMessage();
+ if (rerouteQueues != null && rerouteQueues.size() != 0)
{
- final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
- final ServerMessage message = getMessage();
- if(rerouteQueues != null && rerouteQueues.size() != 0)
- {
- ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
- txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() {
- public void postCommit()
+ ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+
+ txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ try
{
- try
+ for (BaseQueue queue : rerouteQueues)
{
- for(BaseQueue queue : rerouteQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
+ queue.enqueue(message);
}
}
-
- public void onRollback()
+ catch (AMQException e)
{
-
+ throw new RuntimeException(e);
}
- });
- txn.dequeue(currentQueue,message,
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- discard();
- }
-
- public void onRollback()
- {
-
- }
- });
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ txn.dequeue(currentQueue, message, new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ discard();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
}
}
}
@@ -492,33 +495,6 @@ public class QueueEntryImpl implements QueueEntry
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
- public QueueEntryImpl getNext()
- {
-
- QueueEntryImpl next = nextNode();
- while(next != null && next.isDispensed() )
- {
-
- final QueueEntryImpl newNext = next.nextNode();
- if(newNext != null)
- {
- SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
- next = nextNode();
- }
- else
- {
- next = null;
- }
-
- }
- return next;
- }
-
- QueueEntryImpl nextNode()
- {
- return _next;
- }
-
public boolean isDeleted()
{
return _state == DELETED_STATE;
@@ -530,7 +506,7 @@ public class QueueEntryImpl implements QueueEntry
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.advanceHead();
+ _queueEntryList.entryDeleted(this);
return true;
}
else
@@ -554,4 +530,19 @@ public class QueueEntryImpl implements QueueEntry
return _state.isDispensed();
}
+ public int getDeliveryCount()
+ {
+ return _deliveryCount;
+ }
+
+ public void incrementDeliveryCount()
+ {
+ _deliveryCountUpdater.incrementAndGet(this);
+ }
+
+ public void decrementDeliveryCount()
+ {
+ _deliveryCountUpdater.decrementAndGet(this);
+ }
+
}