diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java | 130 |
1 files changed, 97 insertions, 33 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 78310e8eb3..d43e20eb89 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -39,11 +39,8 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import java.util.Queue; /** - * Encapsulation of a supscription to a queue. - * <p/> - * Ties together the protocol session of a subscriber, the consumer tag that - * was given out by the broker and the channel id. - * <p/> + * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag + * that was given out by the broker and the channel id. <p/> */ public class SubscriptionImpl implements Subscription { @@ -59,40 +56,46 @@ public class SubscriptionImpl implements Subscription private Queue<AMQMessage> _messages; + private Queue<AMQMessage> _resendQueue; + private final boolean _noLocal; - /** - * True if messages need to be acknowledged - */ + /** True if messages need to be acknowledged */ private final boolean _acks; private FilterManager _filters; private final boolean _isBrowser; private final Boolean _autoClose; private boolean _closed = false; + private DeliveryManager _deliveryManager; + public static class Factory implements SubscriptionFactory { - public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException + public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, + boolean acks, FieldTable filters, boolean noLocal, + DeliveryManager deliveryManager) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal); + return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, deliveryManager); } - public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) + public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, + DeliveryManager deliveryManager) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false); + return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, deliveryManager); } } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, - String consumerTag, boolean acks) + String consumerTag, boolean acks, DeliveryManager deliveryManager) throws AMQException { - this(channelId, protocolSession, consumerTag, acks, null, false); + this(channelId, protocolSession, consumerTag, acks, null, false, deliveryManager); } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, - String consumerTag, boolean acks, FieldTable filters, boolean noLocal) + String consumerTag, boolean acks, FieldTable filters, boolean noLocal, + DeliveryManager deliveryManager) throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); @@ -107,6 +110,7 @@ public class SubscriptionImpl implements Subscription sessionKey = protocolSession.getKey(); _acks = acks; _noLocal = noLocal; + _deliveryManager = deliveryManager; _filters = FilterManagerFactory.createManager(filters); @@ -165,7 +169,7 @@ public class SubscriptionImpl implements Subscription String consumerTag) throws AMQException { - this(channel, protocolSession, consumerTag, false); + this(channel, protocolSession, consumerTag, false, null); } public boolean equals(Object o) @@ -173,9 +177,7 @@ public class SubscriptionImpl implements Subscription return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o); } - /** - * Equality holds if the session matches and the channel and consumer tag are the same. - */ + /** Equality holds if the session matches and the channel and consumer tag are the same. */ private boolean equals(SubscriptionImpl psc) { return sessionKey.equals(psc.sessionKey) @@ -194,11 +196,12 @@ public class SubscriptionImpl implements Subscription } /** - * This method can be called by each of the publisher threads. - * As a result all changes to the channel object must be thread safe. + * This method can be called by each of the publisher threads. As a result all changes to the channel object must be + * thread safe. * * @param msg * @param queue + * * @throws AMQException */ public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException @@ -225,7 +228,7 @@ public class SubscriptionImpl implements Subscription // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -259,7 +262,7 @@ public class SubscriptionImpl implements Subscription { queue.dequeue(msg); } - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -376,6 +379,11 @@ public class SubscriptionImpl implements Subscription public void close() { + if (_resendQueue != null && !_resendQueue.isEmpty()) + { + requeue(); + } + if (!_closed) { _logger.info("Closing autoclose subscription:" + this); @@ -383,18 +391,74 @@ public class SubscriptionImpl implements Subscription // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - consumerTag // consumerTag - )); + (byte) 8, (byte) 0, // AMQP version (major, minor) + consumerTag // consumerTag + )); _closed = true; } } + private void requeue() + { + //fixme + _logger.error("MESSAGES LOST as subscription hasn't yet resent all its requeued messages"); + } + public boolean isBrowser() { return _isBrowser; } + public Queue<AMQMessage> getResendQueue() + { + if (_resendQueue == null) + { + _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + } + return _resendQueue; + } + + + public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + { + if (_resendQueue != null && !_resendQueue.isEmpty()) + { + return _resendQueue; + } + + if (_filters != null) + { + if (isAutoClose()) + { + if (_messages.isEmpty()) + { + close(); + return null; + } + } + return _messages; + } + else // we want the DM queue + { + return messages; + } + } + + public void addToResendQueue(AMQMessage msg) + { + // add to our resend queue + getResendQueue().add(msg); + + // Mark Queue has having content. + if (_deliveryManager == null) + { + _logger.error("Delivery Manager is null won't be able to resend messages"); + } + else + { + _deliveryManager.setQueueHasContent(this); + } + } private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { @@ -402,13 +466,13 @@ public class SubscriptionImpl implements Subscription // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - consumerTag, // consumerTag - deliveryTag, // deliveryTag - exchange, // exchange - false, // redelivered - routingKey // routingKey - ); + (byte) 8, (byte) 0, // AMQP version (major, minor) + consumerTag, // consumerTag + deliveryTag, // deliveryTag + exchange, // exchange + false, // redelivered + routingKey // routingKey + ); ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? deliverFrame.writePayload(buf); buf.flip(); |