/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.SubscriptionActor; import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; /** * Encapsulation of a supscription to a queue.

Ties together the protocol session of a subscriber, the consumer tag * that was given out by the broker and the channel id.

*/ public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener { private StateListener _stateListener = new StateListener() { public void stateChange(Subscription sub, State oldState, State newState) { } }; private final AtomicReference _state = new AtomicReference(State.ACTIVE); private final AtomicReference _queueContext = new AtomicReference(null); private final ClientDeliveryMethod _deliveryMethod; private final RecordDeliveryMethod _recordMethod; private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); private final Lock _stateChangeLock; private static final AtomicLong idGenerator = new AtomicLong(0); // Create a simple ID that increments for ever new Subscription private final long _subscriptionID = idGenerator.getAndIncrement(); private LogSubject _logSubject; private LogActor _logActor; static final class BrowserSubscription extends SubscriptionImpl { public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException { super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } public boolean isBrowser() { return true; } /** * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * * @param msg The message to send * @throws AMQException */ @Override public void send(QueueEntry msg) throws AMQException { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); sendToClient(msg, deliveryTag); } } @Override public boolean wouldSuspend(QueueEntry msg) { return false; } } public static class NoAckSubscription extends SubscriptionImpl { public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException { super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } public boolean isBrowser() { return false; } /** * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * * @param entry The message to send * @throws AMQException */ @Override public void send(QueueEntry entry) throws AMQException { StoreContext storeContext = getChannel().getStoreContext(); try { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. // By doing this _before_ the send we ensure that it // doesn't get sent if it can't be dequeued, preventing // duplicate delivery on recovery. // The send may of course still fail, in which case, as // the message is unacked, it will be lost. entry.dequeue(storeContext); synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); sendToClient(entry, deliveryTag); } entry.dispose(storeContext); } finally { //Only set delivered if it actually was writen successfully.. // using a try->finally would set it even if an error occured. // Is this what we want? entry.setDeliveredToSubscription(); } } @Override public boolean wouldSuspend(QueueEntry msg) { return false; } } static final class AckSubscription extends SubscriptionImpl { public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException { super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } public boolean isBrowser() { return false; } /** * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * * @param entry The message to send * @throws AMQException */ @Override public void send(QueueEntry entry) throws AMQException { try { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. // By doing this _before_ the send we ensure that it // doesn't get sent if it can't be dequeued, preventing // duplicate delivery on recovery. // The send may of course still fail, in which case, as // the message is unacked, it will be lost. synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); recordMessageDelivery(entry, deliveryTag); sendToClient(entry, deliveryTag); } } finally { //Only set delivered if it actually was writen successfully.. // using a try->finally would set it even if an error occured. // Is this what we want? entry.setDeliveredToSubscription(); } } } private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class); private final AMQChannel _channel; private final AMQShortString _consumerTag; private final boolean _noLocal; private final FlowCreditManager _creditManager; private FilterManager _filters; private final Boolean _autoClose; private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); private AMQQueue _queue; private final AtomicBoolean _deleted = new AtomicBoolean(false); public SubscriptionImpl(AMQChannel channel , AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable arguments, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException { _channel = channel; _consumerTag = consumerTag; _creditManager = creditManager; creditManager.addStateListener(this); _noLocal = noLocal; _filters = FilterManagerFactory.createManager(arguments); _deliveryMethod = deliveryMethod; _recordMethod = recordMethod; _stateChangeLock = new ReentrantLock(); if (arguments != null) { Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue()); if (autoClose != null) { _autoClose = (Boolean) autoClose; } else { _autoClose = false; } } else { _autoClose = false; } } public synchronized void setQueue(AMQQueue queue, boolean exclusive) { if(getQueue() != null) { throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); } _queue = queue; _logSubject = new SubscriptionLogSubject(this); _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this); if (CurrentActor.get().getRootMessageLogger(). isMessageEnabled(CurrentActor.get(), _logSubject)) { // Get the string value of the filters String filterLogString = null; if (_filters != null && _filters.hasFilters()) { filterLogString = _filters.toString(); } if (isAutoClose()) { if (filterLogString == null) { filterLogString = ""; } else { filterLogString += ","; } filterLogString += "AutoClose"; } if (isBrowser()) { // We do not need to check for null here as all Browsers are AutoClose filterLogString +=",Browser"; } CurrentActor.get(). message(_logSubject, SubscriptionMessages.SUB_1001(filterLogString, queue.isDurable() && exclusive, filterLogString != null)); } } public String toString() { String subscriber = "[channel=" + _channel + ", consumerTag=" + _consumerTag + ", session=" + getProtocolSession().getKey() ; return subscriber + "]"; } /** * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * * @param msg The message to send * @throws AMQException */ abstract public void send(QueueEntry msg) throws AMQException; public boolean isSuspended() { return !isActive() || _channel.isSuspended() || _deleted.get(); } /** * Callback indicating that a queue has been deleted. * * @param queue The queue to delete */ public void queueDeleted(AMQQueue queue) { _deleted.set(true); // _channel.queueDeleted(queue); } public boolean filtersMessages() { return _filters != null || _noLocal; } public boolean hasInterest(QueueEntry entry) { //check that the message hasn't been rejected if (entry.isRejectedBy(this)) { if (_logger.isDebugEnabled()) { _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity()); } // return false; } if (_noLocal) { //todo - client id should be recoreded so we don't have to handle // the case where this is null. final Object publisherId = entry.getMessage().getPublisherClientInstance(); // We don't want local messages so check to see if message is one we sent Object localInstance; if (publisherId != null && (getProtocolSession().getClientProperties() != null) && (localInstance = getProtocolSession().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { if(publisherId.equals(localInstance)) { return false; } } else { localInstance = getProtocolSession().getClientIdentifier(); //todo - client id should be recoreded so we don't have to do the null check if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier())) { return false; } } } if (_logger.isDebugEnabled()) { _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); } return checkFilters(entry); } private String id = String.valueOf(System.identityHashCode(this)); private String debugIdentity() { return id; } private boolean checkFilters(QueueEntry msg) { return (_filters == null) || _filters.allAllow(msg.getMessage()); } public boolean isAutoClose() { return _autoClose; } public FlowCreditManager getCreditManager() { return _creditManager; } public void close() { boolean closed = false; State state = getState(); _stateChangeLock.lock(); try { while(!closed && state != State.CLOSED) { closed = _state.compareAndSet(state, State.CLOSED); if(!closed) { state = getState(); } else { _stateListener.stateChange(this,state, State.CLOSED); } } _creditManager.removeListener(this); } finally { _stateChangeLock.unlock(); } //Log Subscription closed CurrentActor.get().message(_logSubject, SubscriptionMessages.SUB_1002()); } public boolean isClosed() { return getState() == State.CLOSED; } public boolean wouldSuspend(QueueEntry msg) { return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage()); } public void getSendLock() { _stateChangeLock.lock(); } public void releaseSendLock() { _stateChangeLock.unlock(); } public void resend(final QueueEntry entry) throws AMQException { _queue.resend(entry, this); } public AMQChannel getChannel() { return _channel; } public AMQShortString getConsumerTag() { return _consumerTag; } public long getSubscriptionID() { return _subscriptionID; } public AMQProtocolSession getProtocolSession() { return _channel.getProtocolSession(); } public LogActor getLogActor() { return _logActor; } public AMQQueue getQueue() { return _queue; } public void restoreCredit(final QueueEntry queueEntry) { _creditManager.addCredit(1, queueEntry.getSize()); } public void creditStateChanged(boolean hasCredit) { if(hasCredit) { if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) { _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); } else { // this is a hack to get round the issue of increasing bytes credit _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE); } } else { if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) { _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); } } CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_1003(_state.get().toString())); } public State getState() { return _state.get(); } public void setStateListener(final StateListener listener) { _stateListener = listener; } public QueueEntry getLastSeenEntry() { return _queueContext.get(); } public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue) { return _queueContext.compareAndSet(expected,newvalue); } protected void sendToClient(final QueueEntry entry, final long deliveryTag) throws AMQException { _deliveryMethod.deliverToClient(this,entry,deliveryTag); } protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag) { _recordMethod.recordMessageDelivery(this,entry,deliveryTag); } public boolean isActive() { return getState() == State.ACTIVE; } public QueueEntry.SubscriptionAcquiredState getOwningState() { return _owningState; } }