summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java94
1 files changed, 55 insertions, 39 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 9df1e7b89b..7e8623c171 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -21,19 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -42,6 +30,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -55,6 +44,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogActor;
@@ -81,6 +71,7 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -123,7 +114,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private IncomingMessage _currentMessage;
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
+ private final Map<AMQShortString, SubscriptionTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, SubscriptionTarget_0_8>();
private final MessageStore _messageStore;
@@ -498,9 +489,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- public Subscription getSubscription(AMQShortString subscription)
+ public Subscription getSubscription(AMQShortString tag)
{
- return _tag2SubscriptionMap.get(subscription);
+ final SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
+ return target == null ? null : target.getSubscription();
}
/**
@@ -526,34 +518,57 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
tag = new AMQShortString("sgen_" + getNextConsumerTag());
}
- if (_tag2SubscriptionMap.containsKey(tag))
+ if (_tag2SubscriptionTargetMap.containsKey(tag))
{
throw new AMQException("Consumer already exists with same tag: " + tag);
}
- Subscription subscription =
- SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
+ SubscriptionTarget_0_8 target;
+ EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
+ if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
+ {
+ target = SubscriptionTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+ options.add(Subscription.Option.TRANSIENT);
+ }
+ else if(acks)
+ {
+ target = SubscriptionTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
+ options.add(Subscription.Option.ACQUIRES);
+ options.add(Subscription.Option.SEES_REQUEUES);
+ }
+ else
+ {
+ target = SubscriptionTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+ options.add(Subscription.Option.ACQUIRES);
+ options.add(Subscription.Option.SEES_REQUEUES);
+ }
+
+ if(exclusive)
+ {
+ options.add(Subscription.Option.EXCLUSIVE);
+ }
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
// We add before we register as the Async Delivery process may AutoClose the subscriber
// so calling _cT2QM.remove before we have done put which was after the register succeeded.
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
- _tag2SubscriptionMap.put(tag, subscription);
+ _tag2SubscriptionTargetMap.put(tag, target);
try
{
- queue.registerSubscription(subscription, exclusive);
+ Subscription sub =
+ queue.registerSubscription(target, FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), AMQMessage.class, AMQShortString.toString(tag), options);
}
catch (AMQException e)
{
- _tag2SubscriptionMap.remove(tag);
+ _tag2SubscriptionTargetMap.remove(tag);
throw e;
}
catch (RuntimeException e)
{
- _tag2SubscriptionMap.remove(tag);
+ _tag2SubscriptionTargetMap.remove(tag);
throw e;
}
return tag;
@@ -568,7 +583,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
{
- Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
+ SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
+ Subscription sub = target == null ? null : target.getSubscription();
if (sub != null)
{
try
@@ -634,7 +650,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
if (_logger.isInfoEnabled())
{
- if (!_tag2SubscriptionMap.isEmpty())
+ if (!_tag2SubscriptionTargetMap.isEmpty())
{
_logger.info("Unsubscribing all consumers on channel " + toString());
}
@@ -644,14 +660,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
}
- for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
+ for (Map.Entry<AMQShortString, SubscriptionTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
{
if (_logger.isInfoEnabled())
{
_logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- Subscription sub = me.getValue();
+ Subscription sub = me.getValue().getSubscription();
try
{
@@ -665,7 +681,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- _tag2SubscriptionMap.clear();
+ _tag2SubscriptionTargetMap.clear();
}
/**
@@ -977,9 +993,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if (wasSuspended)
{
// may need to deliver queued messages
- for (Subscription s : _tag2SubscriptionMap.values())
+ for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
- s.getQueue().deliverAsync(s);
+ s.getSubscription().getQueue().deliverAsync(s.getSubscription());
}
}
@@ -993,15 +1009,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if (!wasSuspended)
{
// may need to deliver queued messages
- for (Subscription s : _tag2SubscriptionMap.values())
+ for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
try
{
- s.getSendLock();
+ s.getSubscription().getSendLock();
}
finally
{
- s.releaseSendLock();
+ s.getSubscription().releaseSendLock();
}
}
}
@@ -1078,10 +1094,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
boolean requiresSuspend = _suspended.compareAndSet(false,true);
// ensure all subscriptions have seen the change to the channel state
- for(Subscription sub : _tag2SubscriptionMap.values())
+ for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getSendLock();
- sub.releaseSendLock();
+ sub.getSubscription().getSendLock();
+ sub.getSubscription().releaseSendLock();
}
try
@@ -1116,9 +1132,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if(requiresSuspend)
{
_suspended.set(false);
- for(Subscription sub : _tag2SubscriptionMap.values())
+ for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getQueue().deliverAsync(sub);
+ sub.getSubscription().getQueue().deliverAsync(sub.getSubscription());
}
}
@@ -1672,6 +1688,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
@Override
public int getConsumerCount()
{
- return _tag2SubscriptionMap.size();
+ return _tag2SubscriptionTargetMap.size();
}
}