summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java299
1 files changed, 193 insertions, 106 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 556b87590c..684d3c2e74 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -22,8 +22,11 @@ package org.apache.qpid.server.subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -32,14 +35,20 @@ import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.SubscriptionActor;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.StoreContext;
/**
* Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
@@ -59,13 +68,25 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null);
+ private AMQQueue.Context _queueContext;
+
private final ClientDeliveryMethod _deliveryMethod;
private final RecordDeliveryMethod _recordMethod;
-
- private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+
+ private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+ private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
+
+ private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+
private final Lock _stateChangeLock;
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+ // Create a simple ID that increments for ever new Subscription
+ private final long _subscriptionID = idGenerator.getAndIncrement();
+ private LogSubject _logSubject;
+ private LogActor _logActor;
+
+
static final class BrowserSubscription extends SubscriptionImpl
{
public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
@@ -91,6 +112,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
* @param msg The message to send
* @throws AMQException
*/
+ @Override
public void send(QueueEntry msg) throws AMQException
{
// We don't decrement the reference here as we don't want to consume the message
@@ -103,6 +125,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
}
+
+ @Override
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return false;
+ }
+
}
public static class NoAckSubscription extends SubscriptionImpl
@@ -130,42 +159,34 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
* @param entry The message to send
* @throws AMQException
*/
+ @Override
public void send(QueueEntry entry) throws AMQException
{
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
- StoreContext storeContext = getChannel().getStoreContext();
- try
- { // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ entry.dequeue();
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- entry.dequeue(storeContext);
+ synchronized (getChannel())
+ {
+ long deliveryTag = getChannel().getNextDeliveryTag();
- synchronized (getChannel())
- {
- long deliveryTag = getChannel().getNextDeliveryTag();
-
- sendToClient(entry, deliveryTag);
+ sendToClient(entry, deliveryTag);
- }
- entry.dispose(storeContext);
}
- finally
- {
- //Only set delivered if it actually was writen successfully..
- // using a try->finally would set it even if an error occured.
- // Is this what we want?
+ entry.dispose();
+
- entry.setDeliveredToSubscription();
- }
}
+ @Override
public boolean wouldSuspend(QueueEntry msg)
{
return false;
@@ -185,6 +206,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
+
public boolean isBrowser()
{
return false;
@@ -198,42 +220,34 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
* @param entry The message to send
* @throws AMQException
*/
+ @Override
public void send(QueueEntry entry) throws AMQException
{
- try
- { // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
-
- synchronized (getChannel())
- {
- long deliveryTag = getChannel().getNextDeliveryTag();
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ synchronized (getChannel())
+ {
+ long deliveryTag = getChannel().getNextDeliveryTag();
- recordMessageDelivery(entry, deliveryTag);
- sendToClient(entry, deliveryTag);
+ recordMessageDelivery(entry, deliveryTag);
+ sendToClient(entry, deliveryTag);
- }
- }
- finally
- {
- //Only set delivered if it actually was writen successfully..
- // using a try->finally would set it even if an error occured.
- // Is this what we want?
- entry.setDeliveredToSubscription();
}
}
+
}
@@ -244,7 +258,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private final AMQShortString _consumerTag;
- private final boolean _noLocal;
+ private boolean _noLocal;
private final FlowCreditManager _creditManager;
@@ -260,7 +274,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
-
+
public SubscriptionImpl(AMQChannel channel , AMQProtocolSession protocolSession,
AMQShortString consumerTag, FieldTable arguments,
boolean noLocal, FlowCreditManager creditManager,
@@ -309,13 +323,52 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
- public synchronized void setQueue(AMQQueue queue)
+ public synchronized void setQueue(AMQQueue queue, boolean exclusive)
{
if(getQueue() != null)
{
throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
}
_queue = queue;
+
+ _logSubject = new SubscriptionLogSubject(this);
+ _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
+
+ if (CurrentActor.get().getRootMessageLogger().
+ isMessageEnabled(CurrentActor.get(), _logSubject))
+ {
+ // Get the string value of the filters
+ String filterLogString = null;
+ if (_filters != null && _filters.hasFilters())
+ {
+ filterLogString = _filters.toString();
+ }
+
+ if (isAutoClose())
+ {
+ if (filterLogString == null)
+ {
+ filterLogString = "";
+ }
+ else
+ {
+ filterLogString += ",";
+ }
+ filterLogString += "AutoClose";
+ }
+
+ if (isBrowser())
+ {
+ // We do not need to check for null here as all Browsers are AutoClose
+ filterLogString +=",Browser";
+ }
+
+ CurrentActor.get().
+ message(_logSubject,
+ SubscriptionMessages.SUB_CREATE(filterLogString,
+ queue.isDurable() && exclusive,
+ filterLogString != null));
+ }
}
public String toString()
@@ -360,45 +413,35 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean hasInterest(QueueEntry entry)
{
+
+
+
+
//check that the message hasn't been rejected
if (entry.isRejectedBy(this))
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity());
+ _logger.debug("Subscription:" + this + " rejected message:" + entry);
}
// return false;
}
-
-
- //todo - client id should be recoreded and this test removed but handled below
if (_noLocal)
{
- final Object publisherId = entry.getMessage().getPublisherClientInstance();
- // We don't want local messages so check to see if message is one we sent
- Object localInstance;
+ AMQMessage message = (AMQMessage) entry.getMessage();
- if (publisherId != null && (getProtocolSession().getClientProperties() != null) &&
- (localInstance = getProtocolSession().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
- {
- if(publisherId.equals(localInstance))
- {
- return false;
- }
- }
- else
- {
-
- localInstance = getProtocolSession().getClientIdentifier();
- //todo - client id should be recoreded and this test removed but handled here
+ //todo - client id should be recorded so we don't have to handle
+ // the case where this is null.
+ final Object publisher = message.getPublisherIdentifier();
+ // We don't want local messages so check to see if message is one we sent
+ Object localInstance = getProtocolSession();
- if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier()))
- {
- return false;
- }
+ if(publisher.equals(localInstance))
+ {
+ return false;
}
@@ -407,7 +450,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
if (_logger.isDebugEnabled())
{
- _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
+ _logger.debug("(" + this + ") checking filters for message (" + entry);
}
return checkFilters(entry);
@@ -422,7 +465,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private boolean checkFilters(QueueEntry msg)
{
- return (_filters == null) || _filters.allAllow(msg.getMessage());
+ return (_filters == null) || _filters.allAllow(msg);
}
public boolean isAutoClose()
@@ -464,20 +507,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
- if (closed)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Called close() on a closed subscription");
- }
-
- return;
- }
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing subscription (" + debugIdentity() + "):" + this);
- }
+ //Log Subscription closed
+ CurrentActor.get().message(_logSubject, SubscriptionMessages.SUB_CLOSE());
}
public boolean isClosed()
@@ -501,11 +532,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
_stateChangeLock.unlock();
}
- public void resend(final QueueEntry entry) throws AMQException
- {
- _queue.resend(entry, this);
- }
-
public AMQChannel getChannel()
{
return _channel;
@@ -516,25 +542,41 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
return _consumerTag;
}
+ public long getSubscriptionID()
+ {
+ return _subscriptionID;
+ }
+
public AMQProtocolSession getProtocolSession()
{
return _channel.getProtocolSession();
}
+ public LogActor getLogActor()
+ {
+ return _logActor;
+ }
+
public AMQQueue getQueue()
{
- return _queue;
+ return _queue;
+ }
+
+ public void onDequeue(final QueueEntry queueEntry)
+ {
+ restoreCredit(queueEntry);
}
public void restoreCredit(final QueueEntry queueEntry)
{
- _creditManager.addCredit(1, queueEntry.getSize());
+ _creditManager.restoreCredit(1, queueEntry.getSize());
}
+
public void creditStateChanged(boolean hasCredit)
{
-
+
if(hasCredit)
{
if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
@@ -554,6 +596,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
_stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
}
}
+ CurrentActor.get().message(_logSubject,SubscriptionMessages.SUB_STATE(_state.get().toString()));
}
public State getState()
@@ -568,14 +611,14 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
- public QueueEntry getLastSeenEntry()
+ public AMQQueue.Context getQueueContext()
{
- return _queueContext.get();
+ return _queueContext;
}
- public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue)
+ public void setQueueContext(AMQQueue.Context context)
{
- return _queueContext.compareAndSet(expected,newvalue);
+ _queueContext = context;
}
@@ -602,4 +645,48 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
return _owningState;
}
+ public QueueEntry.SubscriptionAssignedState getAssignedState()
+ {
+ return _assignedState;
+ }
+
+
+ public void confirmAutoClose()
+ {
+ ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
+ }
+
+ public boolean acquires()
+ {
+ return !isBrowser();
+ }
+
+ public boolean seesRequeues()
+ {
+ return !isBrowser();
+ }
+
+ public boolean isTransient()
+ {
+ return false;
+ }
+
+ public void set(String key, Object value)
+ {
+ _properties.put(key, value);
+ }
+
+ public Object get(String key)
+ {
+ return _properties.get(key);
+ }
+
+
+ public void setNoLocal(boolean noLocal)
+ {
+ _noLocal = noLocal;
+ }
+
+ abstract boolean isBrowser();
}