summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java54
1 files changed, 12 insertions, 42 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index d6a256e2e1..d8f44c9f7f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -88,7 +88,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private final Lock _stateChangeLock;
- private final long _subscriptionID;
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+ // Create a simple ID that increments for ever new Subscription
+ private final long _subscriptionID = idGenerator.getAndIncrement();
private LogSubject _logSubject;
private LogActor _logActor;
private UUID _id;
@@ -102,11 +104,10 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
@@ -150,11 +151,10 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
@@ -211,45 +211,16 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
- /**
- * NoAck Subscription for use with BasicGet method.
- */
- public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
- {
- public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
- }
-
- public boolean isTransient()
- {
- return true;
- }
-
- public boolean wouldSuspend(QueueEntry msg)
- {
- return !getCreditManager().useCreditForMessage(msg.getMessage());
- }
-
- }
-
static final class AckSubscription extends SubscriptionImpl
{
public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
@@ -325,11 +296,10 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
AMQShortString consumerTag, FieldTable arguments,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- _subscriptionID = subscriptionID;
+
_channel = channel;
_consumerTag = consumerTag;
@@ -475,7 +445,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
//check that the message hasn't been rejected
- if (entry.isRejectedBy(getSubscriptionID()))
+ if (entry.isRejectedBy(this))
{
if (_logger.isDebugEnabled())
{