summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java130
1 files changed, 97 insertions, 33 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 78310e8eb3..d43e20eb89 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -39,11 +39,8 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import java.util.Queue;
/**
- * Encapsulation of a supscription to a queue.
- * <p/>
- * Ties together the protocol session of a subscriber, the consumer tag that
- * was given out by the broker and the channel id.
- * <p/>
+ * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
+ * that was given out by the broker and the channel id. <p/>
*/
public class SubscriptionImpl implements Subscription
{
@@ -59,40 +56,46 @@ public class SubscriptionImpl implements Subscription
private Queue<AMQMessage> _messages;
+ private Queue<AMQMessage> _resendQueue;
+
private final boolean _noLocal;
- /**
- * True if messages need to be acknowledged
- */
+ /** True if messages need to be acknowledged */
private final boolean _acks;
private FilterManager _filters;
private final boolean _isBrowser;
private final Boolean _autoClose;
private boolean _closed = false;
+ private DeliveryManager _deliveryManager;
+
public static class Factory implements SubscriptionFactory
{
- public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
+ boolean acks, FieldTable filters, boolean noLocal,
+ DeliveryManager deliveryManager) throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, deliveryManager);
}
- public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+ public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
+ DeliveryManager deliveryManager)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, deliveryManager);
}
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- String consumerTag, boolean acks)
+ String consumerTag, boolean acks, DeliveryManager deliveryManager)
throws AMQException
{
- this(channelId, protocolSession, consumerTag, acks, null, false);
+ this(channelId, protocolSession, consumerTag, acks, null, false, deliveryManager);
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+ String consumerTag, boolean acks, FieldTable filters, boolean noLocal,
+ DeliveryManager deliveryManager)
throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
@@ -107,6 +110,7 @@ public class SubscriptionImpl implements Subscription
sessionKey = protocolSession.getKey();
_acks = acks;
_noLocal = noLocal;
+ _deliveryManager = deliveryManager;
_filters = FilterManagerFactory.createManager(filters);
@@ -165,7 +169,7 @@ public class SubscriptionImpl implements Subscription
String consumerTag)
throws AMQException
{
- this(channel, protocolSession, consumerTag, false);
+ this(channel, protocolSession, consumerTag, false, null);
}
public boolean equals(Object o)
@@ -173,9 +177,7 @@ public class SubscriptionImpl implements Subscription
return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o);
}
- /**
- * Equality holds if the session matches and the channel and consumer tag are the same.
- */
+ /** Equality holds if the session matches and the channel and consumer tag are the same. */
private boolean equals(SubscriptionImpl psc)
{
return sessionKey.equals(psc.sessionKey)
@@ -194,11 +196,12 @@ public class SubscriptionImpl implements Subscription
}
/**
- * This method can be called by each of the publisher threads.
- * As a result all changes to the channel object must be thread safe.
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
*
* @param msg
* @param queue
+ *
* @throws AMQException
*/
public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
@@ -225,7 +228,7 @@ public class SubscriptionImpl implements Subscription
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
- synchronized(channel)
+ synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
@@ -259,7 +262,7 @@ public class SubscriptionImpl implements Subscription
{
queue.dequeue(msg);
}
- synchronized(channel)
+ synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
@@ -376,6 +379,11 @@ public class SubscriptionImpl implements Subscription
public void close()
{
+ if (_resendQueue != null && !_resendQueue.isEmpty())
+ {
+ requeue();
+ }
+
if (!_closed)
{
_logger.info("Closing autoclose subscription:" + this);
@@ -383,18 +391,74 @@ public class SubscriptionImpl implements Subscription
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- consumerTag // consumerTag
- ));
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ consumerTag // consumerTag
+ ));
_closed = true;
}
}
+ private void requeue()
+ {
+ //fixme
+ _logger.error("MESSAGES LOST as subscription hasn't yet resent all its requeued messages");
+ }
+
public boolean isBrowser()
{
return _isBrowser;
}
+ public Queue<AMQMessage> getResendQueue()
+ {
+ if (_resendQueue == null)
+ {
+ _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ }
+ return _resendQueue;
+ }
+
+
+ public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ {
+ if (_resendQueue != null && !_resendQueue.isEmpty())
+ {
+ return _resendQueue;
+ }
+
+ if (_filters != null)
+ {
+ if (isAutoClose())
+ {
+ if (_messages.isEmpty())
+ {
+ close();
+ return null;
+ }
+ }
+ return _messages;
+ }
+ else // we want the DM queue
+ {
+ return messages;
+ }
+ }
+
+ public void addToResendQueue(AMQMessage msg)
+ {
+ // add to our resend queue
+ getResendQueue().add(msg);
+
+ // Mark Queue has having content.
+ if (_deliveryManager == null)
+ {
+ _logger.error("Delivery Manager is null won't be able to resend messages");
+ }
+ else
+ {
+ _deliveryManager.setQueueHasContent(this);
+ }
+ }
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
@@ -402,13 +466,13 @@ public class SubscriptionImpl implements Subscription
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- consumerTag, // consumerTag
- deliveryTag, // deliveryTag
- exchange, // exchange
- false, // redelivered
- routingKey // routingKey
- );
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ consumerTag, // consumerTag
+ deliveryTag, // deliveryTag
+ exchange, // exchange
+ false, // redelivered
+ routingKey // routingKey
+ );
ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
deliverFrame.writePayload(buf);
buf.flip();