summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java322
1 files changed, 238 insertions, 84 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index ba14be5580..b6e6365189 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -20,30 +20,29 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.log4j.Logger;
-import java.util.Set;
import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
-
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class QueueEntryImpl implements QueueEntry
{
- /**
- * Used for debugging purposes.
- */
+ /** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
private final SimpleQueueEntryList _queueEntryList;
- private AMQMessage _message;
+ private AtomicReference<AMQMessage> _messageRef;
private boolean _redelivered;
@@ -52,44 +51,51 @@ public class QueueEntryImpl implements QueueEntry
private volatile EntryState _state = AVAILABLE_STATE;
private static final
- AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
+ AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
_stateUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (QueueEntryImpl.class, EntryState.class, "_state");
-
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueEntryImpl.class, EntryState.class, "_state");
private volatile Set<StateChangeListener> _stateChangeListeners;
private static final
- AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
- _listenersUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
-
+ AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
+ _listenersUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
private static final
- AtomicLongFieldUpdater<QueueEntryImpl>
+ AtomicLongFieldUpdater<QueueEntryImpl>
_entryIdUpdater =
- AtomicLongFieldUpdater.newUpdater
- (QueueEntryImpl.class, "_entryId");
-
+ AtomicLongFieldUpdater.newUpdater
+ (QueueEntryImpl.class, "_entryId");
private volatile long _entryId;
volatile QueueEntryImpl _next;
+ private long _messageSize;
+ private QueueBackingStore _backingStore;
+ private AtomicBoolean _flowed;
+ private Long _messageId;
+
+ private byte _flags = 0;
+
+ private long _expiration;
+
+ private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
+ private boolean _persistent;
+ private boolean _hasBeenUnloaded = false;
QueueEntryImpl(SimpleQueueEntryList queueEntryList)
{
- this(queueEntryList,null,Long.MIN_VALUE);
+ this(queueEntryList, null, Long.MIN_VALUE);
_state = DELETED_STATE;
}
-
public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
{
- _queueEntryList = queueEntryList;
- _message = message;
+ this(queueEntryList, message);
_entryIdUpdater.set(this, entryId);
}
@@ -97,7 +103,21 @@ public class QueueEntryImpl implements QueueEntry
public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
{
_queueEntryList = queueEntryList;
- _message = message;
+ _messageRef = new AtomicReference<AMQMessage>(message);
+ if (message != null)
+ {
+ _messageId = message.getMessageId();
+ _messageSize = message.getSize();
+
+ if (message.isImmediate())
+ {
+ _flags |= IMMEDIATE;
+ }
+ _expiration = message.getExpiration();
+ _persistent = message.isPersistent();
+ }
+ _backingStore = queueEntryList.getBackingStore();
+ _flowed = new AtomicBoolean(false);
}
protected void setEntryId(long entryId)
@@ -117,22 +137,50 @@ public class QueueEntryImpl implements QueueEntry
public AMQMessage getMessage()
{
- return _message;
+ return load();
+ }
+
+ public Long getMessageId()
+ {
+ return _messageId;
}
public long getSize()
{
- return getMessage().getSize();
+ return _messageSize;
}
public boolean getDeliveredToConsumer()
{
- return getMessage().getDeliveredToConsumer();
+ return (_flags & DELIVERED_TO_CONSUMER) != 0;
+ }
+
+ public void setDeliveredToSubscription()
+ {
+ _flags |= DELIVERED_TO_CONSUMER;
+
+ // We have delivered this message so we can unload it if we are flowed.
+ if (_queueEntryList.isFlowed())
+ {
+ unload();
+ }
}
public boolean expired() throws AMQException
{
- return getMessage().expired();
+ if (_expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ return (now > _expiration);
+ }
+
+ return false;
+ }
+
+ public void setExpiration(final long expiration)
+ {
+ _expiration = expiration;
}
public boolean isAcquired()
@@ -140,6 +188,11 @@ public class QueueEntryImpl implements QueueEntry
return _state.getState() == State.ACQUIRED;
}
+ public boolean isAvailable()
+ {
+ return _state.getState() == State.AVAILABLE;
+ }
+
public boolean acquire()
{
return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
@@ -147,8 +200,8 @@ public class QueueEntryImpl implements QueueEntry
private boolean acquire(final EntryState state)
{
- boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
- if(acquired && _stateChangeListeners != null)
+ boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state);
+ if (acquired && _stateChangeListeners != null)
{
notifyStateChange(State.AVAILABLE, State.ACQUIRED);
}
@@ -167,35 +220,41 @@ public class QueueEntryImpl implements QueueEntry
return (_state instanceof SubscriptionAcquiredState);
}
- public void setDeliveredToSubscription()
- {
- getMessage().setDeliveredToConsumer();
- }
-
public void release()
{
- _stateUpdater.set(this,AVAILABLE_STATE);
+ _stateUpdater.set(this, AVAILABLE_STATE);
}
public String debugIdentity()
{
- return getMessage().debugIdentity();
- }
+ String entry = "[State:" + _state.getState().name() + "]";
+ AMQMessage message = _messageRef.get();
- public boolean immediateAndNotDelivered()
+ if (message == null)
+ {
+ return entry + "(Message Unloaded ID:" + _messageId + ")";
+ }
+ else
+ {
+
+ return entry + message.debugIdentity();
+ }
+ }
+
+ public boolean immediateAndNotDelivered()
{
- return _message.immediateAndNotDelivered();
+ return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
}
public ContentHeaderBody getContentHeaderBody() throws AMQException
{
- return _message.getContentHeaderBody();
+ return getMessage().getContentHeaderBody();
}
public boolean isPersistent() throws AMQException
{
- return _message.isPersistent();
+ return _persistent;
}
public boolean isRedelivered()
@@ -206,21 +265,21 @@ public class QueueEntryImpl implements QueueEntry
public void setRedelivered(boolean redelivered)
{
_redelivered = redelivered;
- // todo - here we could mark this message as redelivered so we don't have to mark
- // all messages on recover as redelivered.
+ // todo - here we could record this message as redelivered on this queue in the transactionLog
+ // so we don't have to mark all messages on recover as redelivered.
}
public Subscription getDeliveredSubscription()
{
- EntryState state = _state;
- if (state instanceof SubscriptionAcquiredState)
- {
- return ((SubscriptionAcquiredState) state).getSubscription();
- }
- else
- {
- return null;
- }
+ EntryState state = _state;
+ if (state instanceof SubscriptionAcquiredState)
+ {
+ return ((SubscriptionAcquiredState) state).getSubscription();
+ }
+ else
+ {
+ return null;
+ }
}
@@ -247,7 +306,7 @@ public class QueueEntryImpl implements QueueEntry
}
public boolean isRejectedBy(Subscription subscription)
- {
+ {
if (_rejectedBy != null) // We have subscriptions that rejected this message
{
@@ -259,11 +318,10 @@ public class QueueEntryImpl implements QueueEntry
}
}
-
public void requeue(final StoreContext storeContext) throws AMQException
{
getQueue().requeue(storeContext, this);
- if(_stateChangeListeners != null)
+ if (_stateChangeListeners != null)
{
notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
}
@@ -273,7 +331,7 @@ public class QueueEntryImpl implements QueueEntry
{
EntryState state = _state;
- if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
+ if ((state.getState() == State.ACQUIRED) && _stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
if (state instanceof SubscriptionAcquiredState)
{
@@ -281,41 +339,34 @@ public class QueueEntryImpl implements QueueEntry
s.restoreCredit(this);
}
+ _queueEntryList.dequeued(this);
+
getQueue().dequeue(storeContext, this);
- if(_stateChangeListeners != null)
+
+ if (_stateChangeListeners != null)
{
- notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
+ notifyStateChange(state.getState(), QueueEntry.State.DEQUEUED);
}
-
}
-
}
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener l : _stateChangeListeners)
+ for (StateChangeListener l : _stateChangeListeners)
{
l.stateChanged(this, oldState, newState);
}
}
- public void dispose(final StoreContext storeContext) throws MessageCleanupException
+ public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
{
- if(delete())
- {
- getMessage().decrementReference(storeContext);
- }
- }
-
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
- {
- //if the queue is null then the message is waiting to be acked, but has been removed.
+ //if the queue is null (i.e. queue.delete()'d) then the message is waiting to be acked, but has already be delete()'d;
if (getQueue() != null)
{
dequeue(storeContext);
}
- dispose(storeContext);
+ delete();
}
public boolean isQueueDeleted()
@@ -326,7 +377,7 @@ public class QueueEntryImpl implements QueueEntry
public void addStateChangeListener(StateChangeListener listener)
{
Set<StateChangeListener> listeners = _stateChangeListeners;
- if(listeners == null)
+ if (listeners == null)
{
_listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
listeners = _stateChangeListeners;
@@ -338,7 +389,7 @@ public class QueueEntryImpl implements QueueEntry
public boolean removeStateChangeListener(StateChangeListener listener)
{
Set<StateChangeListener> listeners = _stateChangeListeners;
- if(listeners != null)
+ if (listeners != null)
{
return listeners.remove(listener);
}
@@ -346,10 +397,108 @@ public class QueueEntryImpl implements QueueEntry
return false;
}
+ public void unload()
+ {
+ //Get the currently loaded message
+ AMQMessage message = _messageRef.get();
+
+ // If we have a message in memory and we have a valid backingStore attempt to unload
+ if (message != null && _backingStore != null)
+ {
+ try
+ {
+ // The backingStore will now handle concurrent calls to unload and safely synchronize to ensure
+ // multiple initial unloads are unloads
+ _backingStore.unload(message);
+ _hasBeenUnloaded = true;
+ _messageRef.set(null);
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unloaded:" + debugIdentity());
+ }
+
+
+ // Clear the message reference if the loaded message is still the one we are processing.
+
+ //Update the memoryState if this load call resulted in the message being purged from memory
+ if (!_flowed.getAndSet(true))
+ {
+ _queueEntryList.entryUnloadedUpdateMemory(this);
+ }
+
+ }
+ catch (UnableToFlowMessageException utfme)
+ {
+ // There is no recovery needed as the memory states remain unchanged.
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unable to Flow message:" + debugIdentity() + ", due to:" + utfme.getMessage());
+ }
+ }
+ }
+ }
+
+ public AMQMessage load()
+ {
+ // MessageId and Backing store are null in test scenarios, normally this is not the case.
+ if (_messageId != null && _backingStore != null)
+ {
+ // See if we have the message currently in memory to return
+ AMQMessage message = _messageRef.get();
+ // if we don't then we need to start a load process.
+ if (message == null)
+ {
+ //Synchronize here to ensure only the first thread that attempts to load will perform the load from the
+ // backing store.
+ synchronized (this)
+ {
+ // Check again to see if someone else ahead of us loaded the message
+ message = _messageRef.get();
+ // if we still don't have the message then we need to start a load process.
+ if (message == null)
+ {
+ // Load the message and keep a reference to it
+ message = _backingStore.load(_messageId);
+ // Set the message reference
+ _messageRef.set(message);
+ }
+ else
+ {
+ // If someone else loaded the message then we can jump out here as the Memory Updates will
+ // have been performed by the loading thread
+ return message;
+ }
+ }
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Loaded:" + debugIdentity());
+ }
+
+ //Update the memoryState if this load call resulted in the message comming in to memory
+ if (_flowed.getAndSet(false))
+ {
+ _queueEntryList.entryLoadedUpdateMemory(this);
+ }
+ }
+
+ // Return the message that was either already in memory or the value we just loaded.
+ return message;
+ }
+ // This can be null but only in the case where we have no messageId
+ // in the case where we have no backingStore then we will never have unloaded the message
+ return _messageRef.get();
+ }
+
+ public boolean isFlowed()
+ {
+ return _flowed.get();
+ }
public int compareTo(final QueueEntry o)
{
- QueueEntryImpl other = (QueueEntryImpl)o;
+ QueueEntryImpl other = (QueueEntryImpl) o;
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
@@ -357,13 +506,13 @@ public class QueueEntryImpl implements QueueEntry
{
QueueEntryImpl next = nextNode();
- while(next != null && next.isDeleted())
+ while (next != null && next.isDeleted())
{
final QueueEntryImpl newNext = next.nextNode();
- if(newNext != null)
+ if (newNext != null)
{
- SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
+ SimpleQueueEntryList._nextUpdater.compareAndSet(this, next, newNext);
next = nextNode();
}
else
@@ -389,9 +538,13 @@ public class QueueEntryImpl implements QueueEntry
{
EntryState state = _state;
- if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
+ if (state != DELETED_STATE && _stateUpdater.compareAndSet(this, state, DELETED_STATE))
{
- _queueEntryList.advanceHead();
+ _queueEntryList.advanceHead();
+ if (_backingStore != null && _hasBeenUnloaded)
+ {
+ _backingStore.delete(_messageId);
+ }
return true;
}
else
@@ -404,4 +557,5 @@ public class QueueEntryImpl implements QueueEntry
{
return _queueEntryList;
}
+
}