diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java')
-rw-r--r-- | java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java | 63 |
1 files changed, 53 insertions, 10 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index b15b3f0bfa..fe1cb624e5 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -30,6 +31,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQMethodBody; @@ -86,7 +89,9 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; -public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder +public class AMQChannel<T extends AMQProtocolSession<T>> + implements AMQSessionModel<AMQChannel<T>,T>, + AsyncAutoCommitTransaction.FutureRecorder { public static final int DEFAULT_PREFETCH = 4096; @@ -140,7 +145,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final AMQProtocolSession _session; + private final T _session; private AtomicBoolean _closing = new AtomicBoolean(false); private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); @@ -163,12 +168,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final TransactionTimeoutHelper _transactionTimeoutHelper; private final UUID _id = UUID.randomUUID(); + private final List<Action<? super AMQChannel<T>>> _taskList = + new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>(); + private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); private final ImmediateAction _immediateAction = new ImmediateAction(); - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) + public AMQChannel(T session, int channelId, MessageStore messageStore) throws AMQException { _session = session; @@ -526,7 +534,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive, - MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException + MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, + MessageSource.ConsumerAccessRefused { if (tag == null) { @@ -580,7 +589,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { filterManager = new SimpleFilterManager(); } - filterManager.add(new FilterSupport.NoLocalFilter(source)); + final Object connectionReference = getConnectionReference(); + filterManager.add(new MessageFilter() + { + @Override + public boolean matches(final Filterable message) + { + return message.getConnectionReference() != connectionReference; + } + }); } Consumer sub = source.addConsumer(target, @@ -609,6 +626,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _tag2SubscriptionTargetMap.remove(tag); throw e; } + catch (MessageSource.ConsumerAccessRefused e) + { + _tag2SubscriptionTargetMap.remove(tag); + throw e; + } return tag; } @@ -657,6 +679,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F CurrentActor.get().message(_logSubject, operationalLogMessage); unsubscribeAllConsumers(); + + for (Action<? super AMQChannel<T>> task : _taskList) + { + task.performAction(this); + } + + _transaction.rollback(); try @@ -692,9 +721,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F Consumer sub = me.getValue().getConsumer(); - - sub.close(); - + if(sub != null) + { + sub.close(); + } } _tag2SubscriptionTargetMap.clear(); @@ -1192,7 +1222,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return _id; } - public AMQConnectionModel getConnectionModel() + @Override + public T getConnectionModel() { return _session; } @@ -1208,11 +1239,23 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } @Override - public int compareTo(AMQSessionModel o) + public int compareTo(AMQChannel o) { return getId().compareTo(o.getId()); } + @Override + public void addDeleteTask(final Action<? super AMQChannel<T>> task) + { + _taskList.add(task); + } + + @Override + public void removeDeleteTask(final Action<? super AMQChannel<T>> task) + { + _taskList.remove(task); + } + private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>> { |