diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java | 153 |
1 files changed, 126 insertions, 27 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index f4e7482396..4272541298 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -24,11 +24,13 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; @@ -64,6 +66,9 @@ public class SubscriptionImpl implements Subscription */ private final boolean _acks; private FilterManager _filters; + private final boolean _isBrowser; + private final Boolean _autoClose; + private boolean _closed = false; public static class Factory implements SubscriptionFactory { @@ -105,9 +110,48 @@ public class SubscriptionImpl implements Subscription _filters = FilterManagerFactory.createManager(filters); + + if (_filters != null) + { + Object isBrowser = filters.get(AMQPFilterTypes.NO_CONSUME.getValue()); + if (isBrowser != null) + { + _isBrowser = (Boolean) isBrowser; + } + else + { + _isBrowser = false; + } + } + else + { + _isBrowser = false; + } + + + if (_filters != null) + { + Object autoClose = filters.get(AMQPFilterTypes.AUTO_CLOSE.getValue()); + if (autoClose != null) + { + _autoClose = (Boolean) autoClose; + } + else + { + _autoClose = false; + } + } + else + { + _autoClose = false; + } + + if (_filters != null) { _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + + } else { @@ -116,6 +160,7 @@ public class SubscriptionImpl implements Subscription } } + public SubscriptionImpl(int channel, AMQProtocolSession protocolSession, String consumerTag) throws AMQException @@ -160,44 +205,78 @@ public class SubscriptionImpl implements Subscription { if (msg != null) { - try + if (_isBrowser) + { + sendToBrowser(msg, queue); + } + else { - // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. + sendToConsumer(msg, queue); + } + } + else + { + _logger.error("Attempt to send Null message", new NullPointerException()); + } + } - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. + private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException + { + // We don't decrement the reference here as we don't want to consume the message + // but we do want to send it to the client. - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - if (!_acks) - { - queue.dequeue(msg); - } - synchronized(channel) - { - long deliveryTag = channel.getNextDeliveryTag(); + synchronized(channel) + { + long deliveryTag = channel.getNextDeliveryTag(); + + // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client + // received the message. If it is lost in transit that is not important. + if (_acks) + { + channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); + } + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); + AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); - if (_acks) - { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - } + protocolSession.writeFrame(frame); + } + } - ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); - AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); + private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws FailedDequeueException + { + try + { + // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. - protocolSession.writeFrame(frame); - } + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. + + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + if (!_acks) + { + queue.dequeue(msg); } - finally + synchronized(channel) { - msg.setDeliveredToConsumer(); + long deliveryTag = channel.getNextDeliveryTag(); + + if (_acks) + { + channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } + + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); + AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); + + protocolSession.writeFrame(frame); } } - else + finally { - _logger.error("Attempt to send Null message", new NullPointerException()); + msg.setDeliveredToConsumer(); } } @@ -290,6 +369,26 @@ public class SubscriptionImpl implements Subscription } } + public boolean isAutoClose() + { + return _autoClose; + } + + public void close() + { + if (!_closed) + { + _logger.info("Closing autoclose subscription:" + this); + protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag)); + _closed = true; + } + } + + public boolean isBrowser() + { + return _isBrowser; + } + private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { |