diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue')
5 files changed, 97 insertions, 15 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 54f3c4de09..545a1d941d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -1164,6 +1164,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, entry, false); + if(sub.acquires()) + { + entry.unlockAcquisition(); + } } } } @@ -2001,6 +2005,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> else { deliverMessage(sub, node, batch); + if(sub.acquires()) + { + node.unlockAcquisition(); + } } } @@ -2253,14 +2261,28 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if (!node.isDeleted()) { // If the node has expired then acquire it - if (node.expired() && node.acquire()) + if (node.expired()) { - if (_logger.isDebugEnabled()) + boolean acquiredForDequeueing = node.acquire(); + if(!acquiredForDequeueing && node.getDeliveredToConsumer()) + { + QueueConsumer consumer = (QueueConsumer) node.getDeliveredConsumer(); + acquiredForDequeueing = node.removeAcquisitionFromConsumer(consumer); + if(acquiredForDequeueing) + { + consumer.acquisitionRemoved(node); + } + } + + if(acquiredForDequeueing) { - _logger.debug("Dequeuing expired node " + node); + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeuing expired node " + node); + } + // Then dequeue it. + dequeueEntry(node); } - // Then dequeue it. - dequeueEntry(node); } else { @@ -2527,7 +2549,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> final ServerTransaction txn, final Action<? super MessageInstance> postEnqueueAction) { - txn.enqueue(this,message, new ServerTransaction.Action() + if(!message.isReferenced(this)) + { + txn.enqueue(this, message, new ServerTransaction.Action() { MessageReference _reference = message.newReference(); @@ -2549,6 +2573,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } }); return 1; + } + else + { + return 0; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index 5ffbc0dbaa..71b7636159 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -39,6 +39,8 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, void send(QueueEntry entry, boolean batch); + void acquisitionRemoved(QueueEntry node); + void queueDeleted(); SubFlushRunner getRunner(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 55782ac095..d80aa92007 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -477,6 +477,13 @@ class QueueConsumerImpl } @Override + public void acquisitionRemoved(final QueueEntry node) + { + _target.acquisitionRemoved(node); + _queue.decrementUnackedMsgCount(node); + } + + @Override public String getDistributionMode() { return _distributionMode; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 49644f8d76..6c541d78ef 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -103,7 +103,7 @@ public abstract class QueueEntryImpl implements QueueEntry { _queueEntryList = queueEntryList; - _message = message == null ? null : message.newReference(); + _message = message == null ? null : message.newReference(queueEntryList.getQueue()); _entryIdUpdater.set(this, entryId); populateInstanceProperties(); @@ -112,7 +112,7 @@ public abstract class QueueEntryImpl implements QueueEntry public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message) { _queueEntryList = queueEntryList; - _message = message == null ? null : message.newReference(); + _message = message == null ? null : message.newReference(queueEntryList.getQueue()); populateInstanceProperties(); } @@ -210,7 +210,7 @@ public abstract class QueueEntryImpl implements QueueEntry public boolean acquire(ConsumerImpl sub) { - final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState()); + final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState().getLockedState()); if(acquired) { _deliveryCountUpdater.compareAndSet(this,-1,0); @@ -218,17 +218,57 @@ public abstract class QueueEntryImpl implements QueueEntry return acquired; } + @Override + public boolean lockAcquisition() + { + EntryState state = _state; + if(state instanceof ConsumerAcquiredState) + { + return _stateUpdater.compareAndSet(this, state, ((ConsumerAcquiredState)state).getLockedState()); + } + return state instanceof LockedAcquiredState; + } + + @Override + public boolean unlockAcquisition() + { + EntryState state = _state; + if(state instanceof LockedAcquiredState) + { + return _stateUpdater.compareAndSet(this, state, ((LockedAcquiredState)state).getUnlockedState()); + } + return false; + } + public boolean acquiredByConsumer() { - return (_state instanceof ConsumerAcquiredState); + return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState); } + @Override public boolean isAcquiredBy(ConsumerImpl consumer) { EntryState state = _state; - return state instanceof ConsumerAcquiredState - && ((ConsumerAcquiredState)state).getConsumer() == consumer; + return (state instanceof ConsumerAcquiredState + && ((ConsumerAcquiredState)state).getConsumer() == consumer) + || (state instanceof LockedAcquiredState + && ((LockedAcquiredState)state).getConsumer() == consumer); + } + + @Override + public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer) + { + EntryState state = _state; + if(state instanceof ConsumerAcquiredState + && ((ConsumerAcquiredState)state).getConsumer() == consumer) + { + return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE); + } + else + { + return false; + } } public void release() @@ -238,7 +278,7 @@ public abstract class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE)) { - if(state instanceof ConsumerAcquiredState) + if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState) { getQueue().decrementUnackedMsgCount(this); } @@ -268,6 +308,10 @@ public abstract class QueueEntryImpl implements QueueEntry { return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer(); } + else if (state instanceof LockedAcquiredState) + { + return (QueueConsumer) ((LockedAcquiredState) state).getConsumer(); + } else { return null; @@ -312,7 +356,7 @@ public abstract class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { - if (state instanceof ConsumerAcquiredState) + if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState) { getQueue().decrementUnackedMsgCount(this); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 28dfc73a27..d4a91f2c0b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -24,7 +24,7 @@ import org.apache.qpid.server.message.ServerMessage; public interface QueueEntryList { - AMQQueue getQueue(); + AMQQueue<?> getQueue(); QueueEntry add(ServerMessage message); |