diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java | 54 |
1 files changed, 12 insertions, 42 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index d6a256e2e1..d8f44c9f7f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -88,7 +88,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private final Lock _stateChangeLock; - private final long _subscriptionID; + private static final AtomicLong idGenerator = new AtomicLong(0); + // Create a simple ID that increments for ever new Subscription + private final long _subscriptionID = idGenerator.getAndIncrement(); private LogSubject _logSubject; private LogActor _logActor; private UUID _id; @@ -102,11 +104,10 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod, - long subscriptionID) + RecordDeliveryMethod recordMethod) throws AMQException { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID); + super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } @@ -150,11 +151,10 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod, - long subscriptionID) + RecordDeliveryMethod recordMethod) throws AMQException { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID); + super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } @@ -211,45 +211,16 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } - /** - * NoAck Subscription for use with BasicGet method. - */ - public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription - { - public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod, - long subscriptionID) - throws AMQException - { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID); - } - - public boolean isTransient() - { - return true; - } - - public boolean wouldSuspend(QueueEntry msg) - { - return !getCreditManager().useCreditForMessage(msg.getMessage()); - } - - } - static final class AckSubscription extends SubscriptionImpl { public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod, - long subscriptionID) + RecordDeliveryMethod recordMethod) throws AMQException { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID); + super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } @@ -325,11 +296,10 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage AMQShortString consumerTag, FieldTable arguments, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod, - long subscriptionID) + RecordDeliveryMethod recordMethod) throws AMQException { - _subscriptionID = subscriptionID; + _channel = channel; _consumerTag = consumerTag; @@ -475,7 +445,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage //check that the message hasn't been rejected - if (entry.isRejectedBy(getSubscriptionID())) + if (entry.isRejectedBy(this)) { if (_logger.isDebugEnabled()) { |