summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java153
1 files changed, 126 insertions, 27 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index f4e7482396..4272541298 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -24,11 +24,13 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -64,6 +66,9 @@ public class SubscriptionImpl implements Subscription
*/
private final boolean _acks;
private FilterManager _filters;
+ private final boolean _isBrowser;
+ private final Boolean _autoClose;
+ private boolean _closed = false;
public static class Factory implements SubscriptionFactory
{
@@ -105,9 +110,48 @@ public class SubscriptionImpl implements Subscription
_filters = FilterManagerFactory.createManager(filters);
+
+ if (_filters != null)
+ {
+ Object isBrowser = filters.get(AMQPFilterTypes.NO_CONSUME.getValue());
+ if (isBrowser != null)
+ {
+ _isBrowser = (Boolean) isBrowser;
+ }
+ else
+ {
+ _isBrowser = false;
+ }
+ }
+ else
+ {
+ _isBrowser = false;
+ }
+
+
+ if (_filters != null)
+ {
+ Object autoClose = filters.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
+ if (autoClose != null)
+ {
+ _autoClose = (Boolean) autoClose;
+ }
+ else
+ {
+ _autoClose = false;
+ }
+ }
+ else
+ {
+ _autoClose = false;
+ }
+
+
if (_filters != null)
{
_messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+
+
}
else
{
@@ -116,6 +160,7 @@ public class SubscriptionImpl implements Subscription
}
}
+
public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
String consumerTag)
throws AMQException
@@ -160,44 +205,78 @@ public class SubscriptionImpl implements Subscription
{
if (msg != null)
{
- try
+ if (_isBrowser)
+ {
+ sendToBrowser(msg, queue);
+ }
+ else
{
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
+ sendToConsumer(msg, queue);
+ }
+ }
+ else
+ {
+ _logger.error("Attempt to send Null message", new NullPointerException());
+ }
+ }
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
+ private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ {
+ // We don't decrement the reference here as we don't want to consume the message
+ // but we do want to send it to the client.
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- if (!_acks)
- {
- queue.dequeue(msg);
- }
- synchronized(channel)
- {
- long deliveryTag = channel.getNextDeliveryTag();
+ synchronized(channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
+
+ // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client
+ // received the message. If it is lost in transit that is not important.
+ if (_acks)
+ {
+ channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
- if (_acks)
- {
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- }
+ protocolSession.writeFrame(frame);
+ }
+ }
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+ private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ {
+ try
+ {
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
- protocolSession.writeFrame(frame);
- }
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!_acks)
+ {
+ queue.dequeue(msg);
}
- finally
+ synchronized(channel)
{
- msg.setDeliveredToConsumer();
+ long deliveryTag = channel.getNextDeliveryTag();
+
+ if (_acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
+
+ protocolSession.writeFrame(frame);
}
}
- else
+ finally
{
- _logger.error("Attempt to send Null message", new NullPointerException());
+ msg.setDeliveredToConsumer();
}
}
@@ -290,6 +369,26 @@ public class SubscriptionImpl implements Subscription
}
}
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+ public void close()
+ {
+ if (!_closed)
+ {
+ _logger.info("Closing autoclose subscription:" + this);
+ protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+ _closed = true;
+ }
+ }
+
+ public boolean isBrowser()
+ {
+ return _isBrowser;
+ }
+
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{