diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java | 623 |
1 files changed, 0 insertions, 623 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java deleted file mode 100644 index 043caa53ae..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ /dev/null @@ -1,623 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.subscription; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.filter.FilterManagerFactory; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.StoreContext; - -/** - * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag - * that was given out by the broker and the channel id. <p/> - */ -public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener -{ - - private StateListener _stateListener = new StateListener() - { - - public void stateChange(Subscription sub, State oldState, State newState) - { - - } - }; - - - private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); - private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null); - private final ClientDeliveryMethod _deliveryMethod; - private final RecordDeliveryMethod _recordMethod; - - private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); - private final Lock _stateChangeLock; - - static final class BrowserSubscription extends SubscriptionImpl - { - public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); - } - - - public boolean isBrowser() - { - return true; - } - - /** - * This method can be called by each of the publisher threads. As a result all changes to the channel object must be - * thread safe. - * - * @param msg The message to send - * @throws AMQException - */ - @Override - public void send(QueueEntry msg) throws AMQException - { - // We don't decrement the reference here as we don't want to consume the message - // but we do want to send it to the client. - - synchronized (getChannel()) - { - long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(msg, deliveryTag); - } - - } - - @Override - public boolean wouldSuspend(QueueEntry msg) - { - return false; - } - - } - - public static class NoAckSubscription extends SubscriptionImpl - { - public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); - } - - - public boolean isBrowser() - { - return false; - } - - /** - * This method can be called by each of the publisher threads. As a result all changes to the channel object must be - * thread safe. - * - * @param entry The message to send - * @throws AMQException - */ - @Override - public void send(QueueEntry entry) throws AMQException - { - - StoreContext storeContext = getChannel().getStoreContext(); - try - { - // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. - - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. - - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - entry.dequeueAndDelete(storeContext); - - - synchronized (getChannel()) - { - long deliveryTag = getChannel().getNextDeliveryTag(); - - sendToClient(entry, deliveryTag); - - } - } - finally - { - //Only set delivered if it actually was writen successfully.. - // using a try->finally would set it even if an error occured. - // Is this what we want? - - entry.setDeliveredToSubscription(); - } - } - - @Override - public boolean wouldSuspend(QueueEntry msg) - { - return false; - } - - } - - static final class AckSubscription extends SubscriptionImpl - { - public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); - } - - - public boolean isBrowser() - { - return false; - } - - - /** - * This method can be called by each of the publisher threads. As a result all changes to the channel object must be - * thread safe. - * - * @param entry The message to send - * @throws AMQException - */ - @Override - public void send(QueueEntry entry) throws AMQException - { - - try - { // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. - - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. - - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - - synchronized (getChannel()) - { - long deliveryTag = getChannel().getNextDeliveryTag(); - - - recordMessageDelivery(entry, deliveryTag); - sendToClient(entry, deliveryTag); - - - } - } - finally - { - //Only set delivered if it actually was writen successfully.. - // using a try->finally would set it even if an error occured. - // Is this what we want? - - entry.setDeliveredToSubscription(); - } - } - - - } - - - private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class); - - private final AMQChannel _channel; - - private final AMQShortString _consumerTag; - - - private final boolean _noLocal; - - private final FlowCreditManager _creditManager; - - private FilterManager _filters; - - private final Boolean _autoClose; - - - private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); - - private AMQQueue _queue; - private final AtomicBoolean _deleted = new AtomicBoolean(false); - - - - - public SubscriptionImpl(AMQChannel channel , AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable arguments, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - - _channel = channel; - _consumerTag = consumerTag; - - _creditManager = creditManager; - creditManager.addStateListener(this); - - _noLocal = noLocal; - - - _filters = FilterManagerFactory.createManager(arguments); - - _deliveryMethod = deliveryMethod; - _recordMethod = recordMethod; - - - _stateChangeLock = new ReentrantLock(); - - - if (arguments != null) - { - Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue()); - if (autoClose != null) - { - _autoClose = (Boolean) autoClose; - } - else - { - _autoClose = false; - } - } - else - { - _autoClose = false; - } - - _logger.info(debugIdentity()+" Created subscription:"); - } - - - - public synchronized void setQueue(AMQQueue queue) - { - if(getQueue() != null) - { - throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); - } - _queue = queue; - } - - public String toString() - { - String subscriber = "[channel=" + _channel + - ", consumerTag=" + _consumerTag + - ", session=" + getProtocolSession().getKey() ; - - return subscriber + "]"; - } - - /** - * This method can be called by each of the publisher threads. As a result all changes to the channel object must be - * thread safe. - * - * @param msg The message to send - * @throws AMQException - */ - abstract public void send(QueueEntry msg) throws AMQException; - - - public boolean isSuspended() - { - return !isActive() || _channel.isSuspended() || _deleted.get(); - } - - /** - * Callback indicating that a queue has been deleted. - * - * @param queue The queue to delete - */ - public void queueDeleted(AMQQueue queue) - { - _deleted.set(true); -// _channel.queueDeleted(queue); - } - - public boolean filtersMessages() - { - return _filters != null || _noLocal; - } - - public boolean hasInterest(QueueEntry entry) - { - //check that the message hasn't been rejected - if (entry.isRejectedBy(this)) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity()); - } -// return false; - } - - - - //todo - client id should be recoreded and this test removed but handled below - if (_noLocal) - { - //todo getPublisherClientInstance should be moved to QueueEntryImpl - final Object publisherId = entry.getMessage().getPublisherClientInstance(); - - // We don't want local messages so check to see if message is one we sent - Object localInstance; - - if (publisherId != null && (getProtocolSession().getClientProperties() != null) && - (localInstance = getProtocolSession().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) - { - if(publisherId.equals(localInstance)) - { - return false; - } - } - else - { - - localInstance = getProtocolSession().getClientIdentifier(); - //todo - client id should be recoreded and this test removed but handled here - - - //todo getPublisherIdentifier should be moved to QueueEntryImpl - if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier())) - { - return false; - } - } - - - } - - - if (_logger.isDebugEnabled()) - { - _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); - } - return checkFilters(entry); - - } - - private String id = String.valueOf(System.identityHashCode(this)); - - private String debugIdentity() - { - return id; - } - - private boolean checkFilters(QueueEntry msg) - { - return (_filters == null) || _filters.allAllow(msg); - } - - public boolean isAutoClose() - { - return _autoClose; - } - - public FlowCreditManager getCreditManager() - { - return _creditManager; - } - - - public void close() - { - boolean closed = false; - State state = getState(); - - _stateChangeLock.lock(); - try - { - while(!closed && state != State.CLOSED) - { - closed = _state.compareAndSet(state, State.CLOSED); - if(!closed) - { - state = getState(); - } - else - { - _stateListener.stateChange(this,state, State.CLOSED); - } - } - _creditManager.removeListener(this); - } - finally - { - _stateChangeLock.unlock(); - } - - - if (closed) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Called close() on a closed subscription"); - } - - return; - } - - if (_logger.isInfoEnabled()) - { - _logger.info("Closing subscription (" + debugIdentity() + "):" + this); - } - } - - public boolean isClosed() - { - return getState() == State.CLOSED; - } - - - public boolean wouldSuspend(QueueEntry queueEntry) - { - return !_creditManager.useCreditForMessage(queueEntry); - } - - public void getSendLock() - { - _stateChangeLock.lock(); - } - - public void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - public void resend(final QueueEntry entry) throws AMQException - { - _queue.resend(entry, this); - } - - public AMQChannel getChannel() - { - return _channel; - } - - public AMQShortString getConsumerTag() - { - return _consumerTag; - } - - public AMQProtocolSession getProtocolSession() - { - return _channel.getProtocolSession(); - } - - public AMQQueue getQueue() - { - return _queue; - } - - public void restoreCredit(final QueueEntry queueEntry) - { - _creditManager.addCredit(1, queueEntry.getSize()); - } - - - public void creditStateChanged(boolean hasCredit) - { - - if(hasCredit) - { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) - { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - } - else - { - // this is a hack to get round the issue of increasing bytes credit - _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE); - } - } - else - { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - } - } - - public State getState() - { - return _state.get(); - } - - - public void setStateListener(final StateListener listener) - { - _stateListener = listener; - } - - - public QueueEntry getLastSeenEntry() - { - return _queueContext.get(); - } - - public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue) - { - return _queueContext.compareAndSet(expected,newvalue); - } - - - protected void sendToClient(final QueueEntry entry, final long deliveryTag) - throws AMQException - { - if(_logger.isDebugEnabled()) - { - _logger.debug("Sending Message(" + entry + ") DTag:" + deliveryTag + " to subscription:" + debugIdentity()); - } - _deliveryMethod.deliverToClient(this,entry,deliveryTag); - } - - - protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag) - { - _recordMethod.recordMessageDelivery(this,entry,deliveryTag); - } - - - public boolean isActive() - { - return getState() == State.ACTIVE; - } - - public QueueEntry.SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - -} |