summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java48
1 files changed, 29 insertions, 19 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index dbad5438dc..4cb07c3006 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.log4j.Logger;
import java.util.Set;
@@ -42,7 +44,7 @@ public class QueueEntryImpl implements QueueEntry
private final SimpleQueueEntryList _queueEntryList;
- private AMQMessage _message;
+ private MessageReference _message;
private Set<Subscription> _rejectedBy = null;
@@ -75,6 +77,8 @@ public class QueueEntryImpl implements QueueEntry
private volatile long _entryId;
volatile QueueEntryImpl _next;
+ private boolean _deliveredToConsumer;
+ private boolean _redelivered;
QueueEntryImpl(SimpleQueueEntryList queueEntryList)
@@ -84,18 +88,18 @@ public class QueueEntryImpl implements QueueEntry
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
+ public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
{
_queueEntryList = queueEntryList;
- _message = message;
+ _message = message == null ? null : message.newReference();
_entryIdUpdater.set(this, entryId);
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
+ public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
- _message = message;
+ _message = message == null ? null : message.newReference();
}
protected void setEntryId(long entryId)
@@ -113,9 +117,9 @@ public class QueueEntryImpl implements QueueEntry
return _queueEntryList.getQueue();
}
- public AMQMessage getMessage()
+ public ServerMessage getMessage()
{
- return _message;
+ return _message == null ? null : _message.getMessage();
}
public long getSize()
@@ -125,12 +129,21 @@ public class QueueEntryImpl implements QueueEntry
public boolean getDeliveredToConsumer()
{
- return getMessage().getDeliveredToConsumer();
+ return _deliveredToConsumer;
}
public boolean expired() throws AMQException
{
- return getMessage().expired(getQueue());
+ long expiration = getMessage().getExpiration();
+ if (expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ return (now > expiration);
+ }
+
+ return false;
+
}
public boolean isAcquired()
@@ -167,7 +180,7 @@ public class QueueEntryImpl implements QueueEntry
public void setDeliveredToSubscription()
{
- getMessage().setDeliveredToConsumer();
+ _deliveredToConsumer = true;
}
public void release()
@@ -175,20 +188,15 @@ public class QueueEntryImpl implements QueueEntry
_stateUpdater.set(this,AVAILABLE_STATE);
}
- public String debugIdentity()
- {
- return getMessage().debugIdentity();
- }
-
public boolean immediateAndNotDelivered()
{
- return _message.immediateAndNotDelivered();
+ return getMessage().isImmediate() && !_deliveredToConsumer;
}
public void setRedelivered(boolean b)
{
- getMessage().setRedelivered(b);
+ _redelivered = b;
}
public Subscription getDeliveredSubscription()
@@ -223,7 +231,7 @@ public class QueueEntryImpl implements QueueEntry
}
else
{
- _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+ _log.warn("Requesting rejection by null subscriber:" + this);
}
}
@@ -284,7 +292,9 @@ public class QueueEntryImpl implements QueueEntry
{
if(delete())
{
- getMessage().decrementReference(storeContext);
+ StoreContext sc = StoreContext.setCurrentContext(storeContext);
+ _message.release();
+ StoreContext.setCurrentContext(sc);
}
}