summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java')
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java49
1 files changed, 23 insertions, 26 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 43982db2fd..a2113de8ea 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -75,6 +75,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
+ private final AtomicBoolean _needToClose = new AtomicBoolean();
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
@@ -99,6 +100,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
return _consumers;
}
+
static final class BrowserConsumer extends ConsumerTarget_0_8
{
public BrowserConsumer(AMQChannel channel,
@@ -123,7 +125,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @throws org.apache.qpid.AMQException
*/
@Override
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -131,17 +133,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
synchronized (getChannel())
{
long deliveryTag = getChannel().getNextDeliveryTag();
- return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
}
}
- @Override
- public boolean allocateCredit(ServerMessage msg)
- {
- return true;
- }
-
}
public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
@@ -184,7 +180,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @param batch
*/
@Override
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -211,14 +207,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
ref.release();
- return size;
-
- }
- @Override
- public boolean allocateCredit(ServerMessage msg)
- {
- return true;
}
private static final ServerTransaction.Action NOOP =
@@ -250,11 +239,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
- public boolean allocateCredit(ServerMessage msg)
- {
- return getCreditManager().useCreditForMessage(msg.getSize());
- }
-
}
@@ -295,9 +279,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* @param batch
*/
@Override
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
+ // put queue entry on a list and then notify the connection to read list.
synchronized (getChannel())
{
@@ -309,12 +294,15 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
entry.addStateChangeListener(getReleasedStateChangeListener());
long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
entry.incrementDeliveryCount();
- return size;
}
+
+
}
+
+
}
@@ -399,7 +387,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
return subscriber + "]";
}
- public boolean isSuspended()
+ @Override
+ public boolean doIsSuspended()
{
return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped();
}
@@ -525,6 +514,16 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
{
if (isAutoClose())
{
+ _needToClose.set(true);
+ getChannel().getConnection().notifyWork();
+ }
+ }
+
+ @Override
+ protected void processClosed()
+ {
+ if (_needToClose.get() && getState() != State.CLOSED)
+ {
close();
confirmAutoClose();
}
@@ -533,8 +532,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
public void flushBatched()
{
_channel.getConnection().setDeferFlush(false);
-
- _channel.getConnection().flushBatched();
}
protected void addUnacknowledgedMessage(MessageInstance entry)