summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java63
1 files changed, 53 insertions, 10 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index b15b3f0bfa..fe1cb624e5 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -30,6 +31,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQMethodBody;
@@ -86,7 +89,9 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
-public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
+public class AMQChannel<T extends AMQProtocolSession<T>>
+ implements AMQSessionModel<AMQChannel<T>,T>,
+ AsyncAutoCommitTransaction.FutureRecorder
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -140,7 +145,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private final AMQProtocolSession _session;
+ private final T _session;
private AtomicBoolean _closing = new AtomicBoolean(false);
private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
@@ -163,12 +168,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private final TransactionTimeoutHelper _transactionTimeoutHelper;
private final UUID _id = UUID.randomUUID();
+ private final List<Action<? super AMQChannel<T>>> _taskList =
+ new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>();
+
private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
private final ImmediateAction _immediateAction = new ImmediateAction();
- public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
+ public AMQChannel(T session, int channelId, MessageStore messageStore)
throws AMQException
{
_session = session;
@@ -526,7 +534,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
FieldTable filters, boolean exclusive, boolean noLocal)
throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive,
- MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException
+ MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
+ MessageSource.ConsumerAccessRefused
{
if (tag == null)
{
@@ -580,7 +589,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
filterManager = new SimpleFilterManager();
}
- filterManager.add(new FilterSupport.NoLocalFilter(source));
+ final Object connectionReference = getConnectionReference();
+ filterManager.add(new MessageFilter()
+ {
+ @Override
+ public boolean matches(final Filterable message)
+ {
+ return message.getConnectionReference() != connectionReference;
+ }
+ });
}
Consumer sub =
source.addConsumer(target,
@@ -609,6 +626,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
_tag2SubscriptionTargetMap.remove(tag);
throw e;
}
+ catch (MessageSource.ConsumerAccessRefused e)
+ {
+ _tag2SubscriptionTargetMap.remove(tag);
+ throw e;
+ }
return tag;
}
@@ -657,6 +679,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
CurrentActor.get().message(_logSubject, operationalLogMessage);
unsubscribeAllConsumers();
+
+ for (Action<? super AMQChannel<T>> task : _taskList)
+ {
+ task.performAction(this);
+ }
+
+
_transaction.rollback();
try
@@ -692,9 +721,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
Consumer sub = me.getValue().getConsumer();
-
- sub.close();
-
+ if(sub != null)
+ {
+ sub.close();
+ }
}
_tag2SubscriptionTargetMap.clear();
@@ -1192,7 +1222,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
return _id;
}
- public AMQConnectionModel getConnectionModel()
+ @Override
+ public T getConnectionModel()
{
return _session;
}
@@ -1208,11 +1239,23 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
@Override
- public int compareTo(AMQSessionModel o)
+ public int compareTo(AMQChannel o)
{
return getId().compareTo(o.getId());
}
+ @Override
+ public void addDeleteTask(final Action<? super AMQChannel<T>> task)
+ {
+ _taskList.add(task);
+ }
+
+ @Override
+ public void removeDeleteTask(final Action<? super AMQChannel<T>> task)
+ {
+ _taskList.remove(task);
+ }
+
private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{