summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java')
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java255
1 files changed, 41 insertions, 214 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index 6a3f5b46e1..149f89fab1 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -22,10 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
@@ -41,63 +38,37 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
import org.apache.qpid.amqp_1_0.type.messaging.Modified;
import org.apache.qpid.amqp_1_0.type.messaging.Released;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.AbstractSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
-class
- Subscription_1_0 implements Subscription
+class Subscription_1_0 extends AbstractSubscription implements Subscription
{
private SendingLink_1_0 _link;
- private AMQQueue _queue;
-
- private final AtomicReference<State> _state = new AtomicReference<State>(State.SUSPENDED);
-
- private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
- private final long _id;
- private final boolean _acquires;
- private volatile AMQQueue.Context _queueContext;
- private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
- private ReentrantLock _stateChangeLock = new ReentrantLock();
-
- private boolean _noLocal;
- private FilterManager _filters;
-
private long _deliveryTag = 0L;
- private StateListener _stateListener;
private Binary _transactionId;
- private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer()
- .registerTransactionLayer()
- .registerSecurityLayer();
- private SectionEncoder _sectionEncoder = new SectionEncoderImpl(_typeRegistry);
-
- public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination)
- {
- this(link, destination, ((Source)link.getEndpoint().getSource()).getDistributionMode() != StdDistMode.COPY);
- }
+ private final AMQPDescribedTypeRegistry _typeRegistry;
+ private final SectionEncoder _sectionEncoder;
- public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires)
+ public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires, FilterManager filters)
{
+ super(filters,Message_1_0.class,link.getSession().getConnectionReference(), acquires, acquires, link.getEndpoint().getName(), false);
_link = link;
- _queue = destination.getQueue();
- _id = getEndpoint().getLocalHandle().longValue();
- _acquires = acquires;
+ _typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry();
+ _sectionEncoder = new SectionEncoderImpl(_typeRegistry);
+ setQueue(destination.getQueue(),false);
+ updateState(State.ACTIVE, State.SUSPENDED);
}
private SendingLinkEndpoint getEndpoint()
@@ -105,89 +76,40 @@ class
return _link.getEndpoint();
}
- public LogActor getLogActor()
- {
- return null; //TODO
- }
-
- public boolean isTransient()
- {
- return true; //TODO
- }
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
- public QueueEntry.SubscriptionAcquiredState getOwningState()
- {
- return _owningState;
- }
-
- public void setQueue(final AMQQueue queue, final boolean exclusive)
- {
- //TODO
- }
-
- public void setNoLocal(final boolean noLocal)
- {
- _noLocal = noLocal;
- }
-
- public long getSubscriptionID()
- {
- return _id;
- }
-
public boolean isSuspended()
{
return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend();
}
- public boolean hasInterest(final QueueEntry entry)
+ public void close()
{
- if(_noLocal && entry.getMessage().getConnectionReference() == getSession().getConnection().getReference())
+ boolean closed = false;
+ State state = getState();
+
+ getSendLock();
+ try
{
- return false;
+ while(!closed && state != State.CLOSED)
+ {
+ closed = updateState(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
+ else
+ {
+ getStateListener().stateChanged(this, state, State.CLOSED);
+ }
+ }
}
- else if(!(entry.getMessage() instanceof Message_1_0)
- && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null)
+ finally
{
- return false;
+ releaseSendLock();
}
- return checkFilters(entry);
-
- }
-
- private boolean checkFilters(final QueueEntry entry)
- {
- return (_filters == null) || _filters.allAllow(entry.asFilterable());
- }
-
- public boolean isClosed()
- {
- return !getEndpoint().isAttached();
}
- public boolean acquires()
- {
- return _acquires;
- }
-
- public boolean seesRequeues()
- {
- // TODO
- return acquires();
- }
-
- public void close()
- {
- getEndpoint().detach();
- }
-
- public void send(QueueEntry entry, boolean batch) throws AMQException
+ protected void doSend(QueueEntry entry, boolean batch) throws AMQException
{
// TODO
send(entry);
@@ -301,7 +223,7 @@ class
}
else
{
- UnsettledAction action = _acquires
+ UnsettledAction action = acquires()
? new DispositionAction(tag, queueEntry)
: new DoNothingAction(tag, queueEntry);
@@ -315,7 +237,7 @@ class
transfer.setState(state);
}
// TODO - need to deal with failure here
- if(_acquires && _transactionId != null)
+ if(acquires() && _transactionId != null)
{
ServerTransaction txn = _link.getTransaction(_transactionId);
if(txn != null)
@@ -352,7 +274,7 @@ class
}
- public void queueDeleted(final AMQQueue queue)
+ public void queueDeleted()
{
//TODO
getEndpoint().setSource(null);
@@ -373,98 +295,33 @@ class
}
}
- public boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
public void suspend()
{
synchronized(_link.getLock())
{
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ if(updateState(State.ACTIVE, State.SUSPENDED))
{
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ getStateListener().stateChanged(this, State.ACTIVE, State.SUSPENDED);
}
}
}
- public void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
- public void releaseQueueEntry(QueueEntry queueEntryImpl)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- public void onDequeue(final QueueEntry queueEntry)
- {
- //TODO
- }
public void restoreCredit(final QueueEntry queueEntry)
{
//TODO
}
- public void setStateListener(final StateListener listener)
- {
- _stateListener = listener;
- }
-
- public State getState()
- {
- return _state.get();
- }
-
- public AMQQueue.Context getQueueContext()
- {
- return _queueContext;
- }
-
- public void setQueueContext(AMQQueue.Context queueContext)
- {
- _queueContext = queueContext;
- }
-
-
- public boolean isActive()
- {
- return getState() == State.ACTIVE;
- }
-
- public void set(String key, Object value)
- {
- _properties.put(key, value);
- }
-
- public Object get(String key)
- {
- return _properties.get(key);
- }
-
- public boolean isSessionTransactional()
- {
- return false; //TODO
- }
-
public void queueEmpty()
{
synchronized(_link.getLock())
{
if(_link.drained())
{
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ if(updateState(State.ACTIVE, State.SUSPENDED))
{
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ getStateListener().stateChanged(this, State.ACTIVE, State.SUSPENDED);
}
}
}
@@ -476,9 +333,9 @@ class
{
if(isSuspended() && getEndpoint() != null)
{
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+ if(updateState(State.SUSPENDED, State.ACTIVE))
{
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+ getStateListener().stateChanged(this, State.SUSPENDED, State.ACTIVE);
}
_transactionId = _link.getTransactionId();
}
@@ -640,16 +497,6 @@ class
}
}
- public FilterManager getFilters()
- {
- return _filters;
- }
-
- public void setFilters(final FilterManager filters)
- {
- _filters = filters;
- }
-
@Override
public AMQSessionModel getSessionModel()
{
@@ -658,20 +505,6 @@ class
}
@Override
- public long getBytesOut()
- {
- // TODO
- return 0;
- }
-
- @Override
- public long getMessagesOut()
- {
- // TODO
- return 0;
- }
-
- @Override
public long getUnacknowledgedBytes()
{
// TODO
@@ -685,10 +518,4 @@ class
return 0;
}
- @Override
- public String getConsumerName()
- {
- //TODO
- return "TODO";
- }
}