diff options
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java')
-rw-r--r-- | java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java | 255 |
1 files changed, 41 insertions, 214 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index 6a3f5b46e1..149f89fab1 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -22,10 +22,7 @@ package org.apache.qpid.server.protocol.v1_0; import java.nio.ByteBuffer; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; + import org.apache.qpid.AMQException; import org.apache.qpid.amqp_1_0.codec.ValueHandler; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; @@ -41,63 +38,37 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted; import org.apache.qpid.amqp_1_0.type.messaging.Header; import org.apache.qpid.amqp_1_0.type.messaging.Modified; import org.apache.qpid.amqp_1_0.type.messaging.Released; -import org.apache.qpid.amqp_1_0.type.messaging.Source; -import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.subscription.AbstractSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.ServerTransaction; -class - Subscription_1_0 implements Subscription +class Subscription_1_0 extends AbstractSubscription implements Subscription { private SendingLink_1_0 _link; - private AMQQueue _queue; - - private final AtomicReference<State> _state = new AtomicReference<State>(State.SUSPENDED); - - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); - private final long _id; - private final boolean _acquires; - private volatile AMQQueue.Context _queueContext; - private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); - private ReentrantLock _stateChangeLock = new ReentrantLock(); - - private boolean _noLocal; - private FilterManager _filters; - private long _deliveryTag = 0L; - private StateListener _stateListener; private Binary _transactionId; - private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance() - .registerTransportLayer() - .registerMessagingLayer() - .registerTransactionLayer() - .registerSecurityLayer(); - private SectionEncoder _sectionEncoder = new SectionEncoderImpl(_typeRegistry); - - public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination) - { - this(link, destination, ((Source)link.getEndpoint().getSource()).getDistributionMode() != StdDistMode.COPY); - } + private final AMQPDescribedTypeRegistry _typeRegistry; + private final SectionEncoder _sectionEncoder; - public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires) + public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires, FilterManager filters) { + super(filters,Message_1_0.class,link.getSession().getConnectionReference(), acquires, acquires, link.getEndpoint().getName(), false); _link = link; - _queue = destination.getQueue(); - _id = getEndpoint().getLocalHandle().longValue(); - _acquires = acquires; + _typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry(); + _sectionEncoder = new SectionEncoderImpl(_typeRegistry); + setQueue(destination.getQueue(),false); + updateState(State.ACTIVE, State.SUSPENDED); } private SendingLinkEndpoint getEndpoint() @@ -105,89 +76,40 @@ class return _link.getEndpoint(); } - public LogActor getLogActor() - { - return null; //TODO - } - - public boolean isTransient() - { - return true; //TODO - } - - public AMQQueue getQueue() - { - return _queue; - } - - public QueueEntry.SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - - public void setQueue(final AMQQueue queue, final boolean exclusive) - { - //TODO - } - - public void setNoLocal(final boolean noLocal) - { - _noLocal = noLocal; - } - - public long getSubscriptionID() - { - return _id; - } - public boolean isSuspended() { return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend(); } - public boolean hasInterest(final QueueEntry entry) + public void close() { - if(_noLocal && entry.getMessage().getConnectionReference() == getSession().getConnection().getReference()) + boolean closed = false; + State state = getState(); + + getSendLock(); + try { - return false; + while(!closed && state != State.CLOSED) + { + closed = updateState(state, State.CLOSED); + if(!closed) + { + state = getState(); + } + else + { + getStateListener().stateChanged(this, state, State.CLOSED); + } + } } - else if(!(entry.getMessage() instanceof Message_1_0) - && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null) + finally { - return false; + releaseSendLock(); } - return checkFilters(entry); - - } - - private boolean checkFilters(final QueueEntry entry) - { - return (_filters == null) || _filters.allAllow(entry.asFilterable()); - } - - public boolean isClosed() - { - return !getEndpoint().isAttached(); } - public boolean acquires() - { - return _acquires; - } - - public boolean seesRequeues() - { - // TODO - return acquires(); - } - - public void close() - { - getEndpoint().detach(); - } - - public void send(QueueEntry entry, boolean batch) throws AMQException + protected void doSend(QueueEntry entry, boolean batch) throws AMQException { // TODO send(entry); @@ -301,7 +223,7 @@ class } else { - UnsettledAction action = _acquires + UnsettledAction action = acquires() ? new DispositionAction(tag, queueEntry) : new DoNothingAction(tag, queueEntry); @@ -315,7 +237,7 @@ class transfer.setState(state); } // TODO - need to deal with failure here - if(_acquires && _transactionId != null) + if(acquires() && _transactionId != null) { ServerTransaction txn = _link.getTransaction(_transactionId); if(txn != null) @@ -352,7 +274,7 @@ class } - public void queueDeleted(final AMQQueue queue) + public void queueDeleted() { //TODO getEndpoint().setSource(null); @@ -373,98 +295,33 @@ class } } - public boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } public void suspend() { synchronized(_link.getLock()) { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + if(updateState(State.ACTIVE, State.SUSPENDED)) { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + getStateListener().stateChanged(this, State.ACTIVE, State.SUSPENDED); } } } - public void getSendLock() - { - _stateChangeLock.lock(); - } - - public void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - public void releaseQueueEntry(QueueEntry queueEntryImpl) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - - public void onDequeue(final QueueEntry queueEntry) - { - //TODO - } public void restoreCredit(final QueueEntry queueEntry) { //TODO } - public void setStateListener(final StateListener listener) - { - _stateListener = listener; - } - - public State getState() - { - return _state.get(); - } - - public AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public void setQueueContext(AMQQueue.Context queueContext) - { - _queueContext = queueContext; - } - - - public boolean isActive() - { - return getState() == State.ACTIVE; - } - - public void set(String key, Object value) - { - _properties.put(key, value); - } - - public Object get(String key) - { - return _properties.get(key); - } - - public boolean isSessionTransactional() - { - return false; //TODO - } - public void queueEmpty() { synchronized(_link.getLock()) { if(_link.drained()) { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + if(updateState(State.ACTIVE, State.SUSPENDED)) { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + getStateListener().stateChanged(this, State.ACTIVE, State.SUSPENDED); } } } @@ -476,9 +333,9 @@ class { if(isSuspended() && getEndpoint() != null) { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) + if(updateState(State.SUSPENDED, State.ACTIVE)) { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); + getStateListener().stateChanged(this, State.SUSPENDED, State.ACTIVE); } _transactionId = _link.getTransactionId(); } @@ -640,16 +497,6 @@ class } } - public FilterManager getFilters() - { - return _filters; - } - - public void setFilters(final FilterManager filters) - { - _filters = filters; - } - @Override public AMQSessionModel getSessionModel() { @@ -658,20 +505,6 @@ class } @Override - public long getBytesOut() - { - // TODO - return 0; - } - - @Override - public long getMessagesOut() - { - // TODO - return 0; - } - - @Override public long getUnacknowledgedBytes() { // TODO @@ -685,10 +518,4 @@ class return 0; } - @Override - public String getConsumerName() - { - //TODO - return "TODO"; - } } |