summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java232
1 files changed, 205 insertions, 27 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 6bdfeccc0f..ede7731a06 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -21,10 +21,10 @@
package org.apache.qpid.server.queue;
import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQChannelException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.common.ClientProperties;
@@ -37,6 +37,8 @@ import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.util.MessageQueue;
+import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
/**
* Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
@@ -52,9 +54,11 @@ public class SubscriptionImpl implements Subscription
public final AMQShortString consumerTag;
- private final Object sessionKey;
+ private final Object _sessionKey;
- private Queue<AMQMessage> _messages;
+ private MessageQueue<AMQMessage> _messages;
+
+ private Queue<AMQMessage> _resendQueue;
private final boolean _noLocal;
@@ -63,20 +67,27 @@ public class SubscriptionImpl implements Subscription
private FilterManager _filters;
private final boolean _isBrowser;
private final Boolean _autoClose;
- private boolean _closed = false;
+ private boolean _sentClose = false;
+
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+ private AMQQueue _queue;
+ private final AtomicBoolean _sendLock = new AtomicBoolean(false);
+
+
public static class Factory implements SubscriptionFactory
{
- public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, boolean acks, FieldTable filters,
+ boolean noLocal, AMQQueue queue) throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, queue);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, null);
}
}
@@ -84,25 +95,27 @@ public class SubscriptionImpl implements Subscription
AMQShortString consumerTag, boolean acks)
throws AMQException
{
- this(channelId, protocolSession, consumerTag, acks, null, false);
+ this(channelId, protocolSession, consumerTag, acks, null, false, null);
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+ AMQShortString consumerTag, boolean acks, FieldTable filters,
+ boolean noLocal, AMQQueue queue)
throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
- {
+ {
throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session");
}
this.channel = channel;
this.protocolSession = protocolSession;
this.consumerTag = consumerTag;
- sessionKey = protocolSession.getKey();
+ _sessionKey = protocolSession.getKey();
_acks = acks;
_noLocal = noLocal;
+ _queue = queue;
_filters = FilterManagerFactory.createManager(filters);
@@ -145,9 +158,7 @@ public class SubscriptionImpl implements Subscription
if (_filters != null)
{
- _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
-
-
+ _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
}
else
{
@@ -169,30 +180,47 @@ public class SubscriptionImpl implements Subscription
return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o);
}
- /** Equality holds if the session matches and the channel and consumer tag are the same. */
+ /**
+ * Equality holds if the session matches and the channel and consumer tag are the same.
+ *
+ * @param psc The subscriptionImpl to compare
+ *
+ * @return equality
+ */
private boolean equals(SubscriptionImpl psc)
{
- return sessionKey.equals(psc.sessionKey)
+ return _sessionKey.equals(psc._sessionKey)
&& psc.channel == channel
&& psc.consumerTag.equals(consumerTag);
}
public int hashCode()
{
- return sessionKey.hashCode();
+ return _sessionKey.hashCode();
}
public String toString()
{
- return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]";
+ String subscriber = "[channel=" + channel +
+ ", consumerTag=" + consumerTag +
+ ", session=" + protocolSession.getKey() +
+ ", resendQueue=" + (_resendQueue != null);
+
+ if (_resendQueue != null)
+ {
+ subscriber += ", resendSize=" + _resendQueue.size();
+ }
+
+
+ return subscriber + "]";
}
/**
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
*
- * @param msg
- * @param queue
+ * @param msg The message to send
+ * @param queue the Queue it has been sent from
*
* @throws AMQException
*/
@@ -278,7 +306,18 @@ public class SubscriptionImpl implements Subscription
public boolean isSuspended()
{
- return channel.isSuspended();
+ if (_logger.isTraceEnabled())
+ {
+ if (channel.isSuspended())
+ {
+ _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended");
+ }
+ if (_sendLock.get())
+ {
+ _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing.");
+ }
+ }
+ return channel.isSuspended() || _sendLock.get();
}
/**
@@ -376,11 +415,18 @@ public class SubscriptionImpl implements Subscription
return _messages;
}
- public void enqueueForPreDelivery(AMQMessage msg)
+ public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
{
if (_messages != null)
{
- _messages.offer(msg);
+ if (deliverFirst)
+ {
+ _messages.pushHead(msg);
+ }
+ else
+ {
+ _messages.offer(msg);
+ }
}
}
@@ -391,19 +437,95 @@ public class SubscriptionImpl implements Subscription
public void close()
{
- if (!_closed)
+ synchronized (_sendLock)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting SendLock true");
+ }
+
+ _sendLock.set(true);
+
+ }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this);
+ }
+
+ if (_resendQueue != null && !_resendQueue.isEmpty())
+ {
+ requeue();
+ }
+
+ //remove references in PDQ
+ if (_messages != null)
+ {
+ _messages.clear();
+ }
+
+ if (_autoClose && !_sentClose)
{
_logger.info("Closing autoclose subscription:" + this);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
consumerTag // consumerTag
));
- _closed = true;
+ _sentClose = true;
+ }
+ }
+
+ private void requeue()
+ {
+ if (_queue != null)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Requeuing :" + _resendQueue.size() + " messages");
+ }
+
+ while (!_resendQueue.isEmpty())
+ {
+ AMQMessage resent = _resendQueue.poll();
+
+ resent.release();
+ _queue.subscriberHasPendingResend(false, this, resent);
+
+ try
+ {
+ channel.getTransactionalContext().deliver(resent, _queue, true);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Unable to re-deliver messages", e);
+ }
+ }
+
+ if (!_resendQueue.isEmpty())
+ {
+ _logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null.");
+ }
+
+ _queue.subscriberHasPendingResend(false, this, null);
}
+ else
+ {
+ if (!_resendQueue.isEmpty())
+ {
+ _logger.error("Unable to re-deliver messages as queue is null.");
+ }
+ }
+
+ // Clear the messages
+ _resendQueue = null;
+ }
+
+
+ public boolean isClosed()
+ {
+ return _sendLock.get(); // This rather than _close is used to signify the subscriber is now closed.
}
public boolean isBrowser()
@@ -416,5 +538,61 @@ public class SubscriptionImpl implements Subscription
return channel.wouldSuspend(msg);
}
+ public Queue<AMQMessage> getResendQueue()
+ {
+ if (_resendQueue == null)
+ {
+ _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ }
+ return _resendQueue;
+ }
+
+
+ public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ {
+ if (_resendQueue != null && !_resendQueue.isEmpty())
+ {
+ return _resendQueue;
+ }
+
+ if (_filters != null)
+ {
+ if (isAutoClose())
+ {
+ if (_messages.isEmpty())
+ {
+ close();
+ return null;
+ }
+ }
+ return _messages;
+ }
+ else // we want the DM queue
+ {
+ return messages;
+ }
+ }
+
+ public void addToResendQueue(AMQMessage msg)
+ {
+ // add to our resend queue
+ getResendQueue().add(msg);
+
+ // Mark Queue has having content.
+ if (_queue == null)
+ {
+ _logger.error("Queue is null won't be able to resend messages");
+ }
+ else
+ {
+ _queue.subscriberHasPendingResend(true, this, msg);
+ }
+ }
+
+ public Object getSendLock()
+ {
+ return _sendLock;
+ }
+
}