summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java623
1 files changed, 0 insertions, 623 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
deleted file mode 100644
index 043caa53ae..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ /dev/null
@@ -1,623 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.subscription;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.filter.FilterManagerFactory;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.StoreContext;
-
-/**
- * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
- * that was given out by the broker and the channel id. <p/>
- */
-public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener
-{
-
- private StateListener _stateListener = new StateListener()
- {
-
- public void stateChange(Subscription sub, State oldState, State newState)
- {
-
- }
- };
-
-
- private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null);
- private final ClientDeliveryMethod _deliveryMethod;
- private final RecordDeliveryMethod _recordMethod;
-
- private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
- private final Lock _stateChangeLock;
-
- static final class BrowserSubscription extends SubscriptionImpl
- {
- public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
- }
-
-
- public boolean isBrowser()
- {
- return true;
- }
-
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- * @param msg The message to send
- * @throws AMQException
- */
- @Override
- public void send(QueueEntry msg) throws AMQException
- {
- // We don't decrement the reference here as we don't want to consume the message
- // but we do want to send it to the client.
-
- synchronized (getChannel())
- {
- long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(msg, deliveryTag);
- }
-
- }
-
- @Override
- public boolean wouldSuspend(QueueEntry msg)
- {
- return false;
- }
-
- }
-
- public static class NoAckSubscription extends SubscriptionImpl
- {
- public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
- }
-
-
- public boolean isBrowser()
- {
- return false;
- }
-
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- * @param entry The message to send
- * @throws AMQException
- */
- @Override
- public void send(QueueEntry entry) throws AMQException
- {
-
- StoreContext storeContext = getChannel().getStoreContext();
- try
- {
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
-
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
-
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- entry.dequeueAndDelete(storeContext);
-
-
- synchronized (getChannel())
- {
- long deliveryTag = getChannel().getNextDeliveryTag();
-
- sendToClient(entry, deliveryTag);
-
- }
- }
- finally
- {
- //Only set delivered if it actually was writen successfully..
- // using a try->finally would set it even if an error occured.
- // Is this what we want?
-
- entry.setDeliveredToSubscription();
- }
- }
-
- @Override
- public boolean wouldSuspend(QueueEntry msg)
- {
- return false;
- }
-
- }
-
- static final class AckSubscription extends SubscriptionImpl
- {
- public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
- }
-
-
- public boolean isBrowser()
- {
- return false;
- }
-
-
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- * @param entry The message to send
- * @throws AMQException
- */
- @Override
- public void send(QueueEntry entry) throws AMQException
- {
-
- try
- { // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
-
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
-
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
-
- synchronized (getChannel())
- {
- long deliveryTag = getChannel().getNextDeliveryTag();
-
-
- recordMessageDelivery(entry, deliveryTag);
- sendToClient(entry, deliveryTag);
-
-
- }
- }
- finally
- {
- //Only set delivered if it actually was writen successfully..
- // using a try->finally would set it even if an error occured.
- // Is this what we want?
-
- entry.setDeliveredToSubscription();
- }
- }
-
-
- }
-
-
- private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
-
- private final AMQChannel _channel;
-
- private final AMQShortString _consumerTag;
-
-
- private final boolean _noLocal;
-
- private final FlowCreditManager _creditManager;
-
- private FilterManager _filters;
-
- private final Boolean _autoClose;
-
-
- private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
-
- private AMQQueue _queue;
- private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
-
-
-
- public SubscriptionImpl(AMQChannel channel , AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable arguments,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
-
- _channel = channel;
- _consumerTag = consumerTag;
-
- _creditManager = creditManager;
- creditManager.addStateListener(this);
-
- _noLocal = noLocal;
-
-
- _filters = FilterManagerFactory.createManager(arguments);
-
- _deliveryMethod = deliveryMethod;
- _recordMethod = recordMethod;
-
-
- _stateChangeLock = new ReentrantLock();
-
-
- if (arguments != null)
- {
- Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
- if (autoClose != null)
- {
- _autoClose = (Boolean) autoClose;
- }
- else
- {
- _autoClose = false;
- }
- }
- else
- {
- _autoClose = false;
- }
-
- _logger.info(debugIdentity()+" Created subscription:");
- }
-
-
-
- public synchronized void setQueue(AMQQueue queue)
- {
- if(getQueue() != null)
- {
- throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
- }
- _queue = queue;
- }
-
- public String toString()
- {
- String subscriber = "[channel=" + _channel +
- ", consumerTag=" + _consumerTag +
- ", session=" + getProtocolSession().getKey() ;
-
- return subscriber + "]";
- }
-
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- * @param msg The message to send
- * @throws AMQException
- */
- abstract public void send(QueueEntry msg) throws AMQException;
-
-
- public boolean isSuspended()
- {
- return !isActive() || _channel.isSuspended() || _deleted.get();
- }
-
- /**
- * Callback indicating that a queue has been deleted.
- *
- * @param queue The queue to delete
- */
- public void queueDeleted(AMQQueue queue)
- {
- _deleted.set(true);
-// _channel.queueDeleted(queue);
- }
-
- public boolean filtersMessages()
- {
- return _filters != null || _noLocal;
- }
-
- public boolean hasInterest(QueueEntry entry)
- {
- //check that the message hasn't been rejected
- if (entry.isRejectedBy(this))
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity());
- }
-// return false;
- }
-
-
-
- //todo - client id should be recoreded and this test removed but handled below
- if (_noLocal)
- {
- //todo getPublisherClientInstance should be moved to QueueEntryImpl
- final Object publisherId = entry.getMessage().getPublisherClientInstance();
-
- // We don't want local messages so check to see if message is one we sent
- Object localInstance;
-
- if (publisherId != null && (getProtocolSession().getClientProperties() != null) &&
- (localInstance = getProtocolSession().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
- {
- if(publisherId.equals(localInstance))
- {
- return false;
- }
- }
- else
- {
-
- localInstance = getProtocolSession().getClientIdentifier();
- //todo - client id should be recoreded and this test removed but handled here
-
-
- //todo getPublisherIdentifier should be moved to QueueEntryImpl
- if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier()))
- {
- return false;
- }
- }
-
-
- }
-
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
- }
- return checkFilters(entry);
-
- }
-
- private String id = String.valueOf(System.identityHashCode(this));
-
- private String debugIdentity()
- {
- return id;
- }
-
- private boolean checkFilters(QueueEntry msg)
- {
- return (_filters == null) || _filters.allAllow(msg);
- }
-
- public boolean isAutoClose()
- {
- return _autoClose;
- }
-
- public FlowCreditManager getCreditManager()
- {
- return _creditManager;
- }
-
-
- public void close()
- {
- boolean closed = false;
- State state = getState();
-
- _stateChangeLock.lock();
- try
- {
- while(!closed && state != State.CLOSED)
- {
- closed = _state.compareAndSet(state, State.CLOSED);
- if(!closed)
- {
- state = getState();
- }
- else
- {
- _stateListener.stateChange(this,state, State.CLOSED);
- }
- }
- _creditManager.removeListener(this);
- }
- finally
- {
- _stateChangeLock.unlock();
- }
-
-
- if (closed)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Called close() on a closed subscription");
- }
-
- return;
- }
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing subscription (" + debugIdentity() + "):" + this);
- }
- }
-
- public boolean isClosed()
- {
- return getState() == State.CLOSED;
- }
-
-
- public boolean wouldSuspend(QueueEntry queueEntry)
- {
- return !_creditManager.useCreditForMessage(queueEntry);
- }
-
- public void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
- public void resend(final QueueEntry entry) throws AMQException
- {
- _queue.resend(entry, this);
- }
-
- public AMQChannel getChannel()
- {
- return _channel;
- }
-
- public AMQShortString getConsumerTag()
- {
- return _consumerTag;
- }
-
- public AMQProtocolSession getProtocolSession()
- {
- return _channel.getProtocolSession();
- }
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
- public void restoreCredit(final QueueEntry queueEntry)
- {
- _creditManager.addCredit(1, queueEntry.getSize());
- }
-
-
- public void creditStateChanged(boolean hasCredit)
- {
-
- if(hasCredit)
- {
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
- {
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- }
- else
- {
- // this is a hack to get round the issue of increasing bytes credit
- _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
- }
- }
- else
- {
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- }
- }
-
- public State getState()
- {
- return _state.get();
- }
-
-
- public void setStateListener(final StateListener listener)
- {
- _stateListener = listener;
- }
-
-
- public QueueEntry getLastSeenEntry()
- {
- return _queueContext.get();
- }
-
- public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue)
- {
- return _queueContext.compareAndSet(expected,newvalue);
- }
-
-
- protected void sendToClient(final QueueEntry entry, final long deliveryTag)
- throws AMQException
- {
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Sending Message(" + entry + ") DTag:" + deliveryTag + " to subscription:" + debugIdentity());
- }
- _deliveryMethod.deliverToClient(this,entry,deliveryTag);
- }
-
-
- protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag)
- {
- _recordMethod.recordMessageDelivery(this,entry,deliveryTag);
- }
-
-
- public boolean isActive()
- {
- return getState() == State.ACTIVE;
- }
-
- public QueueEntry.SubscriptionAcquiredState getOwningState()
- {
- return _owningState;
- }
-
-}