diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java | 49 |
1 files changed, 23 insertions, 26 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 43982db2fd..a2113de8ea 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -75,6 +75,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen private final AtomicLong _unacknowledgedCount = new AtomicLong(0); private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>(); + private final AtomicBoolean _needToClose = new AtomicBoolean(); public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, @@ -99,6 +100,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return _consumers; } + static final class BrowserConsumer extends ConsumerTarget_0_8 { public BrowserConsumer(AMQChannel channel, @@ -123,7 +125,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @throws org.apache.qpid.AMQException */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -131,17 +133,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); - return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); } } - @Override - public boolean allocateCredit(ServerMessage msg) - { - return true; - } - } public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, @@ -184,7 +180,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. @@ -211,14 +207,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } ref.release(); - return size; - - } - @Override - public boolean allocateCredit(ServerMessage msg) - { - return true; } private static final ServerTransaction.Action NOOP = @@ -250,11 +239,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } - public boolean allocateCredit(ServerMessage msg) - { - return getCreditManager().useCreditForMessage(msg.getSize()); - } - } @@ -295,9 +279,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { + // put queue entry on a list and then notify the connection to read list. synchronized (getChannel()) { @@ -309,12 +294,15 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen entry.addStateChangeListener(getReleasedStateChangeListener()); long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); entry.incrementDeliveryCount(); - return size; } + + } + + } @@ -399,7 +387,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return subscriber + "]"; } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped(); } @@ -525,6 +514,16 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen { if (isAutoClose()) { + _needToClose.set(true); + getChannel().getConnection().notifyWork(); + } + } + + @Override + protected void processClosed() + { + if (_needToClose.get() && getState() != State.CLOSED) + { close(); confirmAutoClose(); } @@ -533,8 +532,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen public void flushBatched() { _channel.getConnection().setDeferFlush(false); - - _channel.getConnection().flushBatched(); } protected void addUnacknowledgedMessage(MessageInstance entry) |