summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java65
1 files changed, 64 insertions, 1 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 0421a66abf..be4ac9d427 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -23,17 +23,21 @@ package org.apache.qpid.server.consumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.util.StateChangeListener;
public abstract class AbstractConsumerTarget implements ConsumerTarget
{
-
+ private static final Logger LOGGER = Logger.getLogger(AbstractConsumerTarget.class);
private final AtomicReference<State> _state;
private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
@@ -41,6 +45,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
private final Lock _stateChangeLock = new ReentrantLock();
private final AtomicInteger _stateActivates = new AtomicInteger();
+ private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
protected AbstractConsumerTarget(final State initialState)
@@ -48,6 +53,26 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
_state = new AtomicReference<State>(initialState);
}
+ @Override
+ public void processPending()
+ {
+ while(hasMessagesToSend())
+ {
+ sendNextMessage();
+ }
+
+ processClosed();
+ }
+
+ protected abstract void processClosed();
+
+ @Override
+ public final boolean isSuspended()
+ {
+ return getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || doIsSuspended();
+ }
+
+ protected abstract boolean doIsSuspended();
public final State getState()
{
@@ -101,6 +126,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
}
}
+ @Override
public final void notifyCurrentState()
{
@@ -136,4 +162,41 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
_stateChangeLock.unlock();
}
+ @Override
+ public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ {
+ _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
+
+ getSessionModel().getConnectionModel().notifyWork();
+ return entry.getMessage().getSize();
+ }
+
+ protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+
+ @Override
+ public boolean hasMessagesToSend()
+ {
+ return !_queue.isEmpty();
+ }
+
+ @Override
+ public void sendNextMessage()
+ {
+ ConsumerMessageInstancePair consumerMessage = _queue.peek();
+ if (consumerMessage != null)
+ {
+ _queue.poll();
+
+ ConsumerImpl consumer = consumerMessage.getConsumer();
+ MessageInstance entry = consumerMessage.getEntry();
+ boolean batch = consumerMessage.isBatch();
+ doSend(consumer, entry, batch);
+
+ if (consumer.acquires())
+ {
+ entry.unlockAcquisition();
+ }
+ }
+
+ }
}