diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java | 65 |
1 files changed, 64 insertions, 1 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index 0421a66abf..be4ac9d427 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -23,17 +23,21 @@ package org.apache.qpid.server.consumer; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.log4j.Logger; + +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.util.StateChangeListener; public abstract class AbstractConsumerTarget implements ConsumerTarget { - + private static final Logger LOGGER = Logger.getLogger(AbstractConsumerTarget.class); private final AtomicReference<State> _state; private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new @@ -41,6 +45,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget private final Lock _stateChangeLock = new ReentrantLock(); private final AtomicInteger _stateActivates = new AtomicInteger(); + private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue(); protected AbstractConsumerTarget(final State initialState) @@ -48,6 +53,26 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget _state = new AtomicReference<State>(initialState); } + @Override + public void processPending() + { + while(hasMessagesToSend()) + { + sendNextMessage(); + } + + processClosed(); + } + + protected abstract void processClosed(); + + @Override + public final boolean isSuspended() + { + return getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || doIsSuspended(); + } + + protected abstract boolean doIsSuspended(); public final State getState() { @@ -101,6 +126,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget } } + @Override public final void notifyCurrentState() { @@ -136,4 +162,41 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget _stateChangeLock.unlock(); } + @Override + public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + { + _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch)); + + getSessionModel().getConnectionModel().notifyWork(); + return entry.getMessage().getSize(); + } + + protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch); + + @Override + public boolean hasMessagesToSend() + { + return !_queue.isEmpty(); + } + + @Override + public void sendNextMessage() + { + ConsumerMessageInstancePair consumerMessage = _queue.peek(); + if (consumerMessage != null) + { + _queue.poll(); + + ConsumerImpl consumer = consumerMessage.getConsumer(); + MessageInstance entry = consumerMessage.getEntry(); + boolean batch = consumerMessage.isBatch(); + doSend(consumer, entry, batch); + + if (consumer.acquires()) + { + entry.unlockAcquisition(); + } + } + + } } |