diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java | 299 |
1 files changed, 193 insertions, 106 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 556b87590c..684d3c2e74 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -22,8 +22,11 @@ package org.apache.qpid.server.subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ConcurrentHashMap; +import java.util.Map; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -32,14 +35,20 @@ import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.SubscriptionActor; +import org.apache.qpid.server.logging.messages.SubscriptionMessages; +import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.StoreContext; /** * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag @@ -59,13 +68,25 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); - private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null); + private AMQQueue.Context _queueContext; + private final ClientDeliveryMethod _deliveryMethod; private final RecordDeliveryMethod _recordMethod; - - private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); + + private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); + private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this); + + private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); + private final Lock _stateChangeLock; + private static final AtomicLong idGenerator = new AtomicLong(0); + // Create a simple ID that increments for ever new Subscription + private final long _subscriptionID = idGenerator.getAndIncrement(); + private LogSubject _logSubject; + private LogActor _logActor; + + static final class BrowserSubscription extends SubscriptionImpl { public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession, @@ -91,6 +112,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * @param msg The message to send * @throws AMQException */ + @Override public void send(QueueEntry msg) throws AMQException { // We don't decrement the reference here as we don't want to consume the message @@ -103,6 +125,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } } + + @Override + public boolean wouldSuspend(QueueEntry msg) + { + return false; + } + } public static class NoAckSubscription extends SubscriptionImpl @@ -130,42 +159,34 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * @param entry The message to send * @throws AMQException */ + @Override public void send(QueueEntry entry) throws AMQException { + // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. - StoreContext storeContext = getChannel().getStoreContext(); - try - { // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + entry.dequeue(); - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - entry.dequeue(storeContext); + synchronized (getChannel()) + { + long deliveryTag = getChannel().getNextDeliveryTag(); - synchronized (getChannel()) - { - long deliveryTag = getChannel().getNextDeliveryTag(); - - sendToClient(entry, deliveryTag); + sendToClient(entry, deliveryTag); - } - entry.dispose(storeContext); } - finally - { - //Only set delivered if it actually was writen successfully.. - // using a try->finally would set it even if an error occured. - // Is this what we want? + entry.dispose(); + - entry.setDeliveredToSubscription(); - } } + @Override public boolean wouldSuspend(QueueEntry msg) { return false; @@ -185,6 +206,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } + public boolean isBrowser() { return false; @@ -198,42 +220,34 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * @param entry The message to send * @throws AMQException */ + @Override public void send(QueueEntry entry) throws AMQException { - try - { // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. + // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - - synchronized (getChannel()) - { - long deliveryTag = getChannel().getNextDeliveryTag(); + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + synchronized (getChannel()) + { + long deliveryTag = getChannel().getNextDeliveryTag(); - recordMessageDelivery(entry, deliveryTag); - sendToClient(entry, deliveryTag); + recordMessageDelivery(entry, deliveryTag); + sendToClient(entry, deliveryTag); - } - } - finally - { - //Only set delivered if it actually was writen successfully.. - // using a try->finally would set it even if an error occured. - // Is this what we want? - entry.setDeliveredToSubscription(); } } + } @@ -244,7 +258,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private final AMQShortString _consumerTag; - private final boolean _noLocal; + private boolean _noLocal; private final FlowCreditManager _creditManager; @@ -260,7 +274,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage - + public SubscriptionImpl(AMQChannel channel , AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable arguments, boolean noLocal, FlowCreditManager creditManager, @@ -309,13 +323,52 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage - public synchronized void setQueue(AMQQueue queue) + public synchronized void setQueue(AMQQueue queue, boolean exclusive) { if(getQueue() != null) { throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); } _queue = queue; + + _logSubject = new SubscriptionLogSubject(this); + _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this); + + if (CurrentActor.get().getRootMessageLogger(). + isMessageEnabled(CurrentActor.get(), _logSubject)) + { + // Get the string value of the filters + String filterLogString = null; + if (_filters != null && _filters.hasFilters()) + { + filterLogString = _filters.toString(); + } + + if (isAutoClose()) + { + if (filterLogString == null) + { + filterLogString = ""; + } + else + { + filterLogString += ","; + } + filterLogString += "AutoClose"; + } + + if (isBrowser()) + { + // We do not need to check for null here as all Browsers are AutoClose + filterLogString +=",Browser"; + } + + CurrentActor.get(). + message(_logSubject, + SubscriptionMessages.SUB_CREATE(filterLogString, + queue.isDurable() && exclusive, + filterLogString != null)); + } } public String toString() @@ -360,45 +413,35 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean hasInterest(QueueEntry entry) { + + + + //check that the message hasn't been rejected if (entry.isRejectedBy(this)) { if (_logger.isDebugEnabled()) { - _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity()); + _logger.debug("Subscription:" + this + " rejected message:" + entry); } // return false; } - - - //todo - client id should be recoreded and this test removed but handled below if (_noLocal) { - final Object publisherId = entry.getMessage().getPublisherClientInstance(); - // We don't want local messages so check to see if message is one we sent - Object localInstance; + AMQMessage message = (AMQMessage) entry.getMessage(); - if (publisherId != null && (getProtocolSession().getClientProperties() != null) && - (localInstance = getProtocolSession().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) - { - if(publisherId.equals(localInstance)) - { - return false; - } - } - else - { - - localInstance = getProtocolSession().getClientIdentifier(); - //todo - client id should be recoreded and this test removed but handled here + //todo - client id should be recorded so we don't have to handle + // the case where this is null. + final Object publisher = message.getPublisherIdentifier(); + // We don't want local messages so check to see if message is one we sent + Object localInstance = getProtocolSession(); - if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier())) - { - return false; - } + if(publisher.equals(localInstance)) + { + return false; } @@ -407,7 +450,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage if (_logger.isDebugEnabled()) { - _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); + _logger.debug("(" + this + ") checking filters for message (" + entry); } return checkFilters(entry); @@ -422,7 +465,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private boolean checkFilters(QueueEntry msg) { - return (_filters == null) || _filters.allAllow(msg.getMessage()); + return (_filters == null) || _filters.allAllow(msg); } public boolean isAutoClose() @@ -464,20 +507,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } - if (closed) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Called close() on a closed subscription"); - } - - return; - } - - if (_logger.isInfoEnabled()) - { - _logger.info("Closing subscription (" + debugIdentity() + "):" + this); - } + //Log Subscription closed + CurrentActor.get().message(_logSubject, SubscriptionMessages.SUB_CLOSE()); } public boolean isClosed() @@ -501,11 +532,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage _stateChangeLock.unlock(); } - public void resend(final QueueEntry entry) throws AMQException - { - _queue.resend(entry, this); - } - public AMQChannel getChannel() { return _channel; @@ -516,25 +542,41 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return _consumerTag; } + public long getSubscriptionID() + { + return _subscriptionID; + } + public AMQProtocolSession getProtocolSession() { return _channel.getProtocolSession(); } + public LogActor getLogActor() + { + return _logActor; + } + public AMQQueue getQueue() { - return _queue; + return _queue; + } + + public void onDequeue(final QueueEntry queueEntry) + { + restoreCredit(queueEntry); } public void restoreCredit(final QueueEntry queueEntry) { - _creditManager.addCredit(1, queueEntry.getSize()); + _creditManager.restoreCredit(1, queueEntry.getSize()); } + public void creditStateChanged(boolean hasCredit) { - + if(hasCredit) { if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) @@ -554,6 +596,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); } } + CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_STATE(_state.get().toString())); } public State getState() @@ -568,14 +611,14 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } - public QueueEntry getLastSeenEntry() + public AMQQueue.Context getQueueContext() { - return _queueContext.get(); + return _queueContext; } - public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue) + public void setQueueContext(AMQQueue.Context context) { - return _queueContext.compareAndSet(expected,newvalue); + _queueContext = context; } @@ -602,4 +645,48 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return _owningState; } + public QueueEntry.SubscriptionAssignedState getAssignedState() + { + return _assignedState; + } + + + public void confirmAutoClose() + { + ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag()); + } + + public boolean acquires() + { + return !isBrowser(); + } + + public boolean seesRequeues() + { + return !isBrowser(); + } + + public boolean isTransient() + { + return false; + } + + public void set(String key, Object value) + { + _properties.put(key, value); + } + + public Object get(String key) + { + return _properties.get(key); + } + + + public void setNoLocal(boolean noLocal) + { + _noLocal = noLocal; + } + + abstract boolean isBrowser(); } |