diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java')
-rw-r--r-- | java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java | 94 |
1 files changed, 55 insertions, 39 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 9df1e7b89b..7e8623c171 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -21,19 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.UUID; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -42,6 +30,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -55,6 +44,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.logging.LogActor; @@ -81,6 +71,7 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionTarget; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; @@ -123,7 +114,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private IncomingMessage _currentMessage; /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ - private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); + private final Map<AMQShortString, SubscriptionTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, SubscriptionTarget_0_8>(); private final MessageStore _messageStore; @@ -498,9 +489,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - public Subscription getSubscription(AMQShortString subscription) + public Subscription getSubscription(AMQShortString tag) { - return _tag2SubscriptionMap.get(subscription); + final SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag); + return target == null ? null : target.getSubscription(); } /** @@ -526,34 +518,57 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F tag = new AMQShortString("sgen_" + getNextConsumerTag()); } - if (_tag2SubscriptionMap.containsKey(tag)) + if (_tag2SubscriptionTargetMap.containsKey(tag)) { throw new AMQException("Consumer already exists with same tag: " + tag); } - Subscription subscription = - SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager); + SubscriptionTarget_0_8 target; + EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class); + if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) + { + target = SubscriptionTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager); + options.add(Subscription.Option.TRANSIENT); + } + else if(acks) + { + target = SubscriptionTarget_0_8.createAckTarget(this, tag, filters, _creditManager); + options.add(Subscription.Option.ACQUIRES); + options.add(Subscription.Option.SEES_REQUEUES); + } + else + { + target = SubscriptionTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); + options.add(Subscription.Option.ACQUIRES); + options.add(Subscription.Option.SEES_REQUEUES); + } + + if(exclusive) + { + options.add(Subscription.Option.EXCLUSIVE); + } // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. // We add before we register as the Async Delivery process may AutoClose the subscriber // so calling _cT2QM.remove before we have done put which was after the register succeeded. // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. - _tag2SubscriptionMap.put(tag, subscription); + _tag2SubscriptionTargetMap.put(tag, target); try { - queue.registerSubscription(subscription, exclusive); + Subscription sub = + queue.registerSubscription(target, FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), AMQMessage.class, AMQShortString.toString(tag), options); } catch (AMQException e) { - _tag2SubscriptionMap.remove(tag); + _tag2SubscriptionTargetMap.remove(tag); throw e; } catch (RuntimeException e) { - _tag2SubscriptionMap.remove(tag); + _tag2SubscriptionTargetMap.remove(tag); throw e; } return tag; @@ -568,7 +583,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException { - Subscription sub = _tag2SubscriptionMap.remove(consumerTag); + SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag); + Subscription sub = target == null ? null : target.getSubscription(); if (sub != null) { try @@ -634,7 +650,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { if (_logger.isInfoEnabled()) { - if (!_tag2SubscriptionMap.isEmpty()) + if (!_tag2SubscriptionTargetMap.isEmpty()) { _logger.info("Unsubscribing all consumers on channel " + toString()); } @@ -644,14 +660,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet()) + for (Map.Entry<AMQShortString, SubscriptionTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet()) { if (_logger.isInfoEnabled()) { _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } - Subscription sub = me.getValue(); + Subscription sub = me.getValue().getSubscription(); try { @@ -665,7 +681,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - _tag2SubscriptionMap.clear(); + _tag2SubscriptionTargetMap.clear(); } /** @@ -977,9 +993,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if (wasSuspended) { // may need to deliver queued messages - for (Subscription s : _tag2SubscriptionMap.values()) + for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values()) { - s.getQueue().deliverAsync(s); + s.getSubscription().getQueue().deliverAsync(s.getSubscription()); } } @@ -993,15 +1009,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if (!wasSuspended) { // may need to deliver queued messages - for (Subscription s : _tag2SubscriptionMap.values()) + for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values()) { try { - s.getSendLock(); + s.getSubscription().getSendLock(); } finally { - s.releaseSendLock(); + s.getSubscription().releaseSendLock(); } } } @@ -1078,10 +1094,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F boolean requiresSuspend = _suspended.compareAndSet(false,true); // ensure all subscriptions have seen the change to the channel state - for(Subscription sub : _tag2SubscriptionMap.values()) + for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) { - sub.getSendLock(); - sub.releaseSendLock(); + sub.getSubscription().getSendLock(); + sub.getSubscription().releaseSendLock(); } try @@ -1116,9 +1132,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if(requiresSuspend) { _suspended.set(false); - for(Subscription sub : _tag2SubscriptionMap.values()) + for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) { - sub.getQueue().deliverAsync(sub); + sub.getSubscription().getQueue().deliverAsync(sub.getSubscription()); } } @@ -1672,6 +1688,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F @Override public int getConsumerCount() { - return _tag2SubscriptionMap.size(); + return _tag2SubscriptionTargetMap.size(); } } |