diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java | 232 |
1 files changed, 205 insertions, 27 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 6bdfeccc0f..ede7731a06 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -21,10 +21,10 @@ package org.apache.qpid.server.queue; import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQChannelException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.common.ClientProperties; @@ -37,6 +37,8 @@ import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; +import org.apache.qpid.util.MessageQueue; +import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize; /** * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag @@ -52,9 +54,11 @@ public class SubscriptionImpl implements Subscription public final AMQShortString consumerTag; - private final Object sessionKey; + private final Object _sessionKey; - private Queue<AMQMessage> _messages; + private MessageQueue<AMQMessage> _messages; + + private Queue<AMQMessage> _resendQueue; private final boolean _noLocal; @@ -63,20 +67,27 @@ public class SubscriptionImpl implements Subscription private FilterManager _filters; private final boolean _isBrowser; private final Boolean _autoClose; - private boolean _closed = false; + private boolean _sentClose = false; + private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); + private AMQQueue _queue; + private final AtomicBoolean _sendLock = new AtomicBoolean(false); + + public static class Factory implements SubscriptionFactory { - public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException + public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, + AMQShortString consumerTag, boolean acks, FieldTable filters, + boolean noLocal, AMQQueue queue) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal); + return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, queue); } public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false); + return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, null); } } @@ -84,25 +95,27 @@ public class SubscriptionImpl implements Subscription AMQShortString consumerTag, boolean acks) throws AMQException { - this(channelId, protocolSession, consumerTag, acks, null, false); + this(channelId, protocolSession, consumerTag, acks, null, false, null); } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, - AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal) + AMQShortString consumerTag, boolean acks, FieldTable filters, + boolean noLocal, AMQQueue queue) throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); if (channel == null) - { + { throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session"); } this.channel = channel; this.protocolSession = protocolSession; this.consumerTag = consumerTag; - sessionKey = protocolSession.getKey(); + _sessionKey = protocolSession.getKey(); _acks = acks; _noLocal = noLocal; + _queue = queue; _filters = FilterManagerFactory.createManager(filters); @@ -145,9 +158,7 @@ public class SubscriptionImpl implements Subscription if (_filters != null) { - _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); - - + _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>(); } else { @@ -169,30 +180,47 @@ public class SubscriptionImpl implements Subscription return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o); } - /** Equality holds if the session matches and the channel and consumer tag are the same. */ + /** + * Equality holds if the session matches and the channel and consumer tag are the same. + * + * @param psc The subscriptionImpl to compare + * + * @return equality + */ private boolean equals(SubscriptionImpl psc) { - return sessionKey.equals(psc.sessionKey) + return _sessionKey.equals(psc._sessionKey) && psc.channel == channel && psc.consumerTag.equals(consumerTag); } public int hashCode() { - return sessionKey.hashCode(); + return _sessionKey.hashCode(); } public String toString() { - return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]"; + String subscriber = "[channel=" + channel + + ", consumerTag=" + consumerTag + + ", session=" + protocolSession.getKey() + + ", resendQueue=" + (_resendQueue != null); + + if (_resendQueue != null) + { + subscriber += ", resendSize=" + _resendQueue.size(); + } + + + return subscriber + "]"; } /** * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * - * @param msg - * @param queue + * @param msg The message to send + * @param queue the Queue it has been sent from * * @throws AMQException */ @@ -278,7 +306,18 @@ public class SubscriptionImpl implements Subscription public boolean isSuspended() { - return channel.isSuspended(); + if (_logger.isTraceEnabled()) + { + if (channel.isSuspended()) + { + _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended"); + } + if (_sendLock.get()) + { + _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing."); + } + } + return channel.isSuspended() || _sendLock.get(); } /** @@ -376,11 +415,18 @@ public class SubscriptionImpl implements Subscription return _messages; } - public void enqueueForPreDelivery(AMQMessage msg) + public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst) { if (_messages != null) { - _messages.offer(msg); + if (deliverFirst) + { + _messages.pushHead(msg); + } + else + { + _messages.offer(msg); + } } } @@ -391,19 +437,95 @@ public class SubscriptionImpl implements Subscription public void close() { - if (!_closed) + synchronized (_sendLock) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting SendLock true"); + } + + _sendLock.set(true); + + } + if (_logger.isInfoEnabled()) + { + _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this); + } + + if (_resendQueue != null && !_resendQueue.isEmpty()) + { + requeue(); + } + + //remove references in PDQ + if (_messages != null) + { + _messages.clear(); + } + + if (_autoClose && !_sentClose) { _logger.info("Closing autoclose subscription:" + this); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), + (byte) 8, (byte) 0, // AMQP version (major, minor) consumerTag // consumerTag )); - _closed = true; + _sentClose = true; + } + } + + private void requeue() + { + if (_queue != null) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Requeuing :" + _resendQueue.size() + " messages"); + } + + while (!_resendQueue.isEmpty()) + { + AMQMessage resent = _resendQueue.poll(); + + resent.release(); + _queue.subscriberHasPendingResend(false, this, resent); + + try + { + channel.getTransactionalContext().deliver(resent, _queue, true); + } + catch (AMQException e) + { + _logger.error("Unable to re-deliver messages", e); + } + } + + if (!_resendQueue.isEmpty()) + { + _logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null."); + } + + _queue.subscriberHasPendingResend(false, this, null); } + else + { + if (!_resendQueue.isEmpty()) + { + _logger.error("Unable to re-deliver messages as queue is null."); + } + } + + // Clear the messages + _resendQueue = null; + } + + + public boolean isClosed() + { + return _sendLock.get(); // This rather than _close is used to signify the subscriber is now closed. } public boolean isBrowser() @@ -416,5 +538,61 @@ public class SubscriptionImpl implements Subscription return channel.wouldSuspend(msg); } + public Queue<AMQMessage> getResendQueue() + { + if (_resendQueue == null) + { + _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + } + return _resendQueue; + } + + + public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + { + if (_resendQueue != null && !_resendQueue.isEmpty()) + { + return _resendQueue; + } + + if (_filters != null) + { + if (isAutoClose()) + { + if (_messages.isEmpty()) + { + close(); + return null; + } + } + return _messages; + } + else // we want the DM queue + { + return messages; + } + } + + public void addToResendQueue(AMQMessage msg) + { + // add to our resend queue + getResendQueue().add(msg); + + // Mark Queue has having content. + if (_queue == null) + { + _logger.error("Queue is null won't be able to resend messages"); + } + else + { + _queue.subscriberHasPendingResend(true, this, msg); + } + } + + public Object getSendLock() + { + return _sendLock; + } + } |