summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-05 10:29:55 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-05 10:29:55 +0000
commit7a54b9a25cf96675325a8cb6bfd1d2e4f43b8edd (patch)
tree0649e5d5fc37c77d4e6d79249d9a3e92be7c0f03
parente993cb9572e57416b1a48cb64e9d4b90b2f16ea2 (diff)
downloadqpid-python-7a54b9a25cf96675325a8cb6bfd1d2e4f43b8edd.tar.gz
rename subscription to consumer
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564703 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (renamed from java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java)16
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java (renamed from java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java)10
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (renamed from java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java)10
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java1
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java30
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java31
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java54
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java29
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java46
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java (renamed from java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedSubscriptionMessageGroupManager.java)30
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java50
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java16
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java14
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (renamed from java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java)82
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java (renamed from java/broker-core/src/main/java/org/apache/qpid/server/queue/SubscriptionList.java)86
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java42
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java318
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java2
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java7
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (renamed from java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java)21
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java (renamed from java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java)2
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java2
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java6
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java445
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java40
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java12
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java41
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java302
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SubscriptionListTest.java442
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (renamed from java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java)51
-rwxr-xr-xjava/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java13
-rwxr-xr-xjava/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java11
-rwxr-xr-xjava/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java7
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java4
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java25
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java38
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java81
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java4
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java4
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (renamed from java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java)108
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java12
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java4
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java4
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java29
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java102
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java14
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java4
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java4
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (renamed from java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java)31
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java39
-rw-r--r--java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java7
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java (renamed from java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java)2
55 files changed, 1398 insertions, 1399 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 4d9550907e..aa721e598a 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -18,20 +18,20 @@
* under the License.
*
*/
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.consumer;
import org.apache.qpid.server.util.StateChangeListener;
import java.util.concurrent.atomic.AtomicReference;
-public abstract class AbstractSubscriptionTarget implements SubscriptionTarget
+public abstract class AbstractConsumerTarget implements ConsumerTarget
{
private final AtomicReference<State> _state;
- private final AtomicReference<StateChangeListener<SubscriptionTarget, State>> _stateListener =
- new AtomicReference<StateChangeListener<SubscriptionTarget, State>>();
+ private final AtomicReference<StateChangeListener<ConsumerTarget, State>> _stateListener =
+ new AtomicReference<StateChangeListener<ConsumerTarget, State>>();
- protected AbstractSubscriptionTarget(final State initialState)
+ protected AbstractConsumerTarget(final State initialState)
{
_state = new AtomicReference<State>(initialState);
}
@@ -46,7 +46,7 @@ public abstract class AbstractSubscriptionTarget implements SubscriptionTarget
{
if(_state.compareAndSet(from, to))
{
- StateChangeListener<SubscriptionTarget, State> listener = _stateListener.get();
+ StateChangeListener<ConsumerTarget, State> listener = _stateListener.get();
if(listener != null)
{
listener.stateChanged(this, from, to);
@@ -60,12 +60,12 @@ public abstract class AbstractSubscriptionTarget implements SubscriptionTarget
}
- public final void setStateListener(StateChangeListener<SubscriptionTarget, State> listener)
+ public final void setStateListener(StateChangeListener<ConsumerTarget, State> listener)
{
_stateListener.set(listener);
}
- public final StateChangeListener<SubscriptionTarget, State> getStateListener()
+ public final StateChangeListener<ConsumerTarget, State> getStateListener()
{
return _stateListener.get();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
index 572b076ba2..4c1ebc3547 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.consumer;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.AMQException;
@@ -27,7 +27,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.util.StateChangeListener;
-public interface Subscription
+public interface Consumer
{
AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
@@ -63,11 +63,11 @@ public interface Subscription
AMQSessionModel getSessionModel();
- MessageInstance.SubscriptionAcquiredState getOwningState();
+ MessageInstance.ConsumerAcquiredState getOwningState();
void setNoLocal(boolean noLocal);
- long getSubscriptionID();
+ long getId();
boolean isSuspended();
@@ -101,7 +101,7 @@ public interface Subscription
void restoreCredit(final MessageInstance queueEntry);
- void setStateListener(final StateChangeListener<? extends Subscription, State> listener);
+ void setStateListener(final StateChangeListener<? extends Consumer, State> listener);
public State getState();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index 80298cccc8..92579475ed 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.consumer;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.MessageInstance;
@@ -26,7 +26,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.util.StateChangeListener;
-public interface SubscriptionTarget
+public interface ConsumerTarget
{
@@ -37,11 +37,11 @@ public interface SubscriptionTarget
State getState();
- void subscriptionRegistered(Subscription sub);
+ void consumerAdded(Consumer sub);
- void subscriptionRemoved(Subscription sub);
+ void consumerRemoved(Consumer sub);
- void setStateListener(StateChangeListener<SubscriptionTarget, State> listener);
+ void setStateListener(StateChangeListener<ConsumerTarget, State> listener);
long getUnacknowledgedBytes();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
index 6251471139..91d9ef7dbc 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
@@ -35,7 +35,6 @@ import org.apache.qpid.server.logging.LogSubject;
* 2) We can set new actors at the point we have enough information. i.e.
* - Set a low level ConnectionActor when processing bytes from the wire.
* - Set a ChannelActor when we are processing the frame
- * - Set a SubscriptionActor when we are handling the subscription.
* <p/>
* The code performing the logging need not worry about what type of actor is
* currently set so can perform its logging. The resulting log entry though will
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 80fa93c417..733fded846 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -25,7 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
@@ -49,25 +49,25 @@ public interface MessageInstance
boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener);
- boolean acquiredBySubscription();
+ boolean acquiredByConsumer();
- boolean isAcquiredBy(Subscription subscription);
+ boolean isAcquiredBy(Consumer consumer);
void setRedelivered();
boolean isRedelivered();
- Subscription getDeliveredSubscription();
+ Consumer getDeliveredConsumer();
void reject();
- boolean isRejectedBy(Subscription subscription);
+ boolean isRejectedBy(Consumer consumer);
boolean getDeliveredToConsumer();
boolean expired() throws AMQException;
- boolean acquire(Subscription sub);
+ boolean acquire(Consumer sub);
int getMaximumDeliveryCount();
@@ -148,7 +148,7 @@ public interface MessageInstance
}
}
- public final class NonSubscriptionAcquiredState extends EntryState
+ public final class NonConsumerAcquiredState extends EntryState
{
public State getState()
{
@@ -161,13 +161,13 @@ public interface MessageInstance
}
}
- public final class SubscriptionAcquiredState extends EntryState
+ public final class ConsumerAcquiredState extends EntryState
{
- private final Subscription _subscription;
+ private final Consumer _consumer;
- public SubscriptionAcquiredState(Subscription subscription)
+ public ConsumerAcquiredState(Consumer consumer)
{
- _subscription = subscription;
+ _consumer = consumer;
}
@@ -176,14 +176,14 @@ public interface MessageInstance
return State.ACQUIRED;
}
- public Subscription getSubscription()
+ public Consumer getConsumer()
{
- return _subscription;
+ return _consumer;
}
public String toString()
{
- return "{" + getState().name() + " : " + _subscription +"}";
+ return "{" + getState().name() + " : " + _consumer +"}";
}
}
@@ -191,7 +191,7 @@ public interface MessageInstance
final static EntryState AVAILABLE_STATE = new AvailableState();
final static EntryState DELETED_STATE = new DeletedState();
final static EntryState DEQUEUED_STATE = new DequeuedState();
- final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
+ final static EntryState NON_CONSUMER_ACQUIRED_STATE = new NonConsumerAcquiredState();
boolean isAvailable();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index e813d0c129..355a1cf3b1 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -77,6 +77,6 @@ public interface Session extends ConfiguredObject
CHANNEL_ID,
PRODUCER_FLOW_BLOCKED));
- Collection<Consumer> getSubscriptions();
+ Collection<Consumer> getConsumers();
Collection<Publisher> getPublishers();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
index 850a6c9d80..cf6874030b 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
@@ -22,33 +22,32 @@ package org.apache.qpid.server.model.adapter;
import java.util.Map;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import java.security.AccessControlException;
import java.util.Collection;
import java.util.Collections;
-public class ConsumerAdapter extends AbstractAdapter implements Consumer
+public class ConsumerAdapter extends AbstractAdapter implements org.apache.qpid.server.model.Consumer
{
- private final Subscription _subscription;
+ private final Consumer _consumer;
private final QueueAdapter _queue;
private final SessionAdapter _session;
private final ConsumerStatistics _statistics;
public ConsumerAdapter(final QueueAdapter queueAdapter, final SessionAdapter sessionAdapter,
- final Subscription subscription)
+ final Consumer consumer)
{
super(UUIDGenerator.generateConsumerUUID(queueAdapter.getVirtualHost().getName(),
queueAdapter.getName(),
- subscription.getSessionModel().getConnectionModel().getRemoteAddressString(),
- String.valueOf(subscription.getSessionModel().getChannelId()),
- subscription.getName()), queueAdapter.getTaskExecutor());
- _subscription = subscription;
+ consumer.getSessionModel().getConnectionModel().getRemoteAddressString(),
+ String.valueOf(consumer.getSessionModel().getChannelId()),
+ consumer.getName()), queueAdapter.getTaskExecutor());
+ _consumer = consumer;
_queue = queueAdapter;
_session = sessionAdapter;
_statistics = new ConsumerStatistics();
@@ -57,7 +56,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer
public String getName()
{
- return _subscription.getName();
+ return _consumer.getName();
}
public String setName(final String currentName, final String desiredName)
@@ -107,7 +106,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer
@Override
public Collection<String> getAttributeNames()
{
- return Consumer.AVAILABLE_ATTRIBUTES;
+ return org.apache.qpid.server.model.Consumer.AVAILABLE_ATTRIBUTES;
}
@Override
@@ -147,7 +146,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer
}
else if(DISTRIBUTION_MODE.equals(name))
{
- return _subscription.acquires() ? "MOVE" : "COPY";
+ return _consumer.acquires() ? "MOVE" : "COPY";
}
else if(SETTLEMENT_MODE.equals(name))
{
@@ -197,11 +196,11 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer
{
if(name.equals(BYTES_OUT))
{
- return _subscription.getBytesOut();
+ return _consumer.getBytesOut();
}
else if(name.equals(MESSAGES_OUT))
{
- return _subscription.getMessagesOut();
+ return _consumer.getMessagesOut();
}
else if(name.equals(STATE_CHANGED))
{
@@ -209,11 +208,11 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer
}
else if(name.equals(UNACKNOWLEDGED_BYTES))
{
- return _subscription.getUnacknowledgedBytes();
+ return _consumer.getUnacknowledgedBytes();
}
else if(name.equals(UNACKNOWLEDGED_MESSAGES))
{
- return _subscription.getUnacknowledgedMessages();
+ return _consumer.getUnacknowledgedMessages();
}
return null; // TODO - Implement
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index 074f7c243b..d59b13902b 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -35,7 +35,6 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFinder;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -47,10 +46,11 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.MapValueConverter;
-final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.SubscriptionRegistrationListener, AMQQueue.NotificationListener
+final class QueueAdapter extends AbstractAdapter implements Queue,
+ AMQQueue.ConsumerRegistrationListener, AMQQueue.NotificationListener
{
@SuppressWarnings("serial")
static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{
@@ -69,8 +69,8 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
private final AMQQueue _queue;
private final Map<Binding, BindingAdapter> _bindingAdapters =
new HashMap<Binding, BindingAdapter>();
- private Map<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter> _consumerAdapters =
- new HashMap<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter>();
+ private Map<Consumer, ConsumerAdapter> _consumerAdapters =
+ new HashMap<Consumer, ConsumerAdapter>();
private final VirtualHostAdapter _vhost;
@@ -84,7 +84,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter);
_queue = queue;
- _queue.addSubscriptionRegistrationListener(this);
+ _queue.addConsumerRegistrationListener(this);
populateConsumers();
_statistics = new QueueStatisticsAdapter(queue);
_queue.setNotificationListener(this);
@@ -124,21 +124,21 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
private void populateConsumers()
{
- Collection<org.apache.qpid.server.subscription.Subscription> actualSubscriptions = _queue.getConsumers();
+ Collection<Consumer> actualConsumers = _queue.getConsumers();
synchronized (_consumerAdapters)
{
- Iterator<org.apache.qpid.server.subscription.Subscription> iter = _consumerAdapters.keySet().iterator();
- for(org.apache.qpid.server.subscription.Subscription subscription : actualSubscriptions)
+ Iterator<Consumer> iter = _consumerAdapters.keySet().iterator();
+ for(Consumer consumer : actualConsumers)
{
- if(!_consumerAdapters.containsKey(subscription))
+ if(!_consumerAdapters.containsKey(consumer))
{
- SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
- ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
- _consumerAdapters.put(subscription, adapter);
+ SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
+ ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, consumer);
+ _consumerAdapters.put(consumer, adapter);
if (sessionAdapter != null)
{ // Register ConsumerAdapter with the SessionAdapter.
- sessionAdapter.subscriptionRegistered(subscription, adapter);
+ sessionAdapter.consumerRegistered(consumer, adapter);
}
}
}
@@ -153,11 +153,11 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
}
}
- public Collection<Consumer> getConsumers()
+ public Collection<org.apache.qpid.server.model.Consumer> getConsumers()
{
synchronized (_consumerAdapters)
{
- return new ArrayList<Consumer>(_consumerAdapters.values());
+ return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values());
}
}
@@ -502,7 +502,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
- if(clazz == Consumer.class)
+ if(clazz == org.apache.qpid.server.model.Consumer.class)
{
return (Collection<C>) getConsumers();
}
@@ -587,19 +587,19 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
return _queue;
}
- public void subscriptionRegistered(final AMQQueue queue, final Subscription subscription)
+ public void consumerAdded(final AMQQueue queue, final Consumer consumer)
{
ConsumerAdapter adapter = null;
synchronized (_consumerAdapters)
{
- if(!_consumerAdapters.containsKey(subscription))
+ if(!_consumerAdapters.containsKey(consumer))
{
- SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
- adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
- _consumerAdapters.put(subscription, adapter);
+ SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
+ adapter = new ConsumerAdapter(this, sessionAdapter, consumer);
+ _consumerAdapters.put(consumer, adapter);
if (sessionAdapter != null)
{ // Register ConsumerAdapter with the SessionAdapter.
- sessionAdapter.subscriptionRegistered(subscription, adapter);
+ sessionAdapter.consumerRegistered(consumer, adapter);
}
}
}
@@ -609,20 +609,20 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
}
}
- public void subscriptionUnregistered(final AMQQueue queue, final Subscription subscription)
+ public void consumerRemoved(final AMQQueue queue, final Consumer consumer)
{
ConsumerAdapter adapter = null;
synchronized (_consumerAdapters)
{
- adapter = _consumerAdapters.remove(subscription);
+ adapter = _consumerAdapters.remove(consumer);
}
if(adapter != null)
{
- SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
if (sessionAdapter != null)
{ // Unregister ConsumerAdapter with the SessionAdapter.
- sessionAdapter.subscriptionUnregistered(subscription);
+ sessionAdapter.consumerUnregistered(consumer);
}
childRemoved(adapter);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index 31ce7e56fd..6b76eeefa0 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -34,9 +34,8 @@ import org.apache.qpid.server.model.Publisher;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -47,7 +46,7 @@ final class SessionAdapter extends AbstractAdapter implements Session
private AMQSessionModel _session;
private SessionStatistics _statistics;
- private Map<Subscription, ConsumerAdapter> _consumerAdapters = new HashMap<Subscription, ConsumerAdapter>();
+ private Map<Consumer, ConsumerAdapter> _consumerAdapters = new HashMap<Consumer, ConsumerAdapter>();
public SessionAdapter(final AMQSessionModel session, TaskExecutor taskExecutor)
{
@@ -56,11 +55,11 @@ final class SessionAdapter extends AbstractAdapter implements Session
_statistics = new SessionStatistics();
}
- public Collection<Consumer> getSubscriptions()
+ public Collection<org.apache.qpid.server.model.Consumer> getConsumers()
{
synchronized (_consumerAdapters)
{
- return new ArrayList<Consumer>(_consumerAdapters.values());
+ return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values());
}
}
@@ -119,29 +118,29 @@ final class SessionAdapter extends AbstractAdapter implements Session
}
/**
- * Register a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
- * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+ * Register a ConsumerAdapter with this Session keyed by the Consumer.
+ * @param consumer the org.apache.qpid.server.consumer.Consumer used to key the ConsumerAdapter.
* @param adapter the registered ConsumerAdapter.
*/
- void subscriptionRegistered(Subscription subscription, ConsumerAdapter adapter)
+ void consumerRegistered(Consumer consumer, ConsumerAdapter adapter)
{
synchronized (_consumerAdapters)
{
- _consumerAdapters.put(subscription, adapter);
+ _consumerAdapters.put(consumer, adapter);
}
childAdded(adapter);
}
/**
- * Unregister a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
- * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+ * Unregister a ConsumerAdapter with this Session keyed by the Consumer.
+ * @param consumer the org.apache.qpid.server.consumer.Consumer used to key the ConsumerAdapter.
*/
- void subscriptionUnregistered(Subscription subscription)
+ void consumerUnregistered(Consumer consumer)
{
ConsumerAdapter adapter = null;
synchronized (_consumerAdapters)
{
- adapter = _consumerAdapters.remove(subscription);
+ adapter = _consumerAdapters.remove(consumer);
}
if (adapter != null)
{
@@ -188,9 +187,9 @@ final class SessionAdapter extends AbstractAdapter implements Session
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
- if(clazz == Consumer.class)
+ if(clazz == org.apache.qpid.server.model.Consumer.class)
{
- return (Collection<C>) getSubscriptions();
+ return (Collection<C>) getConsumers();
}
else if(clazz == Publisher.class)
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 8cd8a3edee..018ba454e4 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -31,8 +31,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -83,27 +83,27 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
VirtualHost getVirtualHost();
- Subscription registerSubscription(final SubscriptionTarget target, final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
- final String consumerName, EnumSet<Subscription.Option> options) throws AMQException;
+ Consumer addConsumer(final ConsumerTarget target, final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName, EnumSet<Consumer.Option> options) throws AMQException;
- Collection<Subscription> getConsumers();
+ Collection<Consumer> getConsumers();
- interface SubscriptionRegistrationListener
+ interface ConsumerRegistrationListener
{
- void subscriptionRegistered(AMQQueue queue, Subscription subscription);
- void subscriptionUnregistered(AMQQueue queue, Subscription subscription);
+ void consumerAdded(AMQQueue queue, Consumer consumer);
+ void consumerRemoved(AMQQueue queue, Consumer consumer);
}
- void addSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
- void removeSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
+ void addConsumerRegistrationListener(ConsumerRegistrationListener listener);
+ void removeConsumerRegistrationListener(ConsumerRegistrationListener listener);
int getConsumerCount();
int getActiveConsumerCount();
- boolean hasExclusiveSubscriber();
+ boolean hasExclusiveConsumer();
boolean isUnused();
@@ -126,11 +126,11 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
void requeue(QueueEntry entry);
- void dequeue(QueueEntry entry, Subscription sub);
+ void dequeue(QueueEntry entry, Consumer sub);
void decrementUnackedMsgCount(QueueEntry queueEntry);
- boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
+ boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException;
void addQueueDeleteTask(Action<AMQQueue> task);
void removeQueueDeleteTask(Action<AMQQueue> task);
@@ -209,7 +209,7 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
Set<NotificationCheck> getNotificationChecks();
- void flushSubscription(final Subscription sub) throws AMQException;
+ void flushConsumer(final Consumer sub) throws AMQException;
void deliverAsync();
@@ -227,43 +227,43 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
void checkCapacity(AMQSessionModel channel);
/**
- * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
+ * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer
* already exists.
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
+ * <tr><td> Represent failure to create a consumer, because an exclusive consumer already exists.
* </table>
*
* @todo Not an AMQP exception as no status code.
*
* @todo Move to top level, used outside this class.
*/
- static final class ExistingExclusiveSubscription extends AMQException
+ static final class ExistingExclusiveConsumer extends AMQException
{
- public ExistingExclusiveSubscription()
+ public ExistingExclusiveConsumer()
{
super("");
}
}
/**
- * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusive subscription, as a subscription
+ * ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer
* already exists.
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create an exclusive subscription, as a subscription already exists.
+ * <tr><td> Represent failure to create an exclusive consumer, as a consumer already exists.
* </table>
*
* @todo Not an AMQP exception as no status code.
*
* @todo Move to top level, used outside this class.
*/
- static final class ExistingSubscriptionPreventsExclusive extends AMQException
+ static final class ExistingConsumerPreventsExclusive extends AMQException
{
- public ExistingSubscriptionPreventsExclusive()
+ public ExistingConsumerPreventsExclusive()
{
super("");
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedSubscriptionMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
index e7e34ac2b8..a9b36c1b24 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedSubscriptionMessageGroupManager.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,16 +28,16 @@ import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
-public class AssignedSubscriptionMessageGroupManager implements MessageGroupManager
+public class AssignedConsumerMessageGroupManager implements MessageGroupManager
{
- private static final Logger _logger = LoggerFactory.getLogger(AssignedSubscriptionMessageGroupManager.class);
+ private static final Logger _logger = LoggerFactory.getLogger(AssignedConsumerMessageGroupManager.class);
private final String _groupId;
- private final ConcurrentHashMap<Integer, QueueSubscription> _groupMap = new ConcurrentHashMap<Integer, QueueSubscription>();
+ private final ConcurrentHashMap<Integer, QueueConsumer> _groupMap = new ConcurrentHashMap<Integer, QueueConsumer>();
private final int _groupMask;
- public AssignedSubscriptionMessageGroupManager(final String groupId, final int maxGroups)
+ public AssignedConsumerMessageGroupManager(final String groupId, final int maxGroups)
{
_groupId = groupId;
_groupMask = pow2(maxGroups)-1;
@@ -53,13 +53,13 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
return val;
}
- public QueueSubscription getAssignedSubscription(final QueueEntry entry)
+ public QueueConsumer getAssignedConsumer(final QueueEntry entry)
{
Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
}
- public boolean acceptMessage(QueueSubscription sub, QueueEntry entry)
+ public boolean acceptMessage(QueueConsumer sub, QueueEntry entry)
{
if(assignMessage(sub, entry))
{
@@ -71,7 +71,7 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
}
}
- private boolean assignMessage(QueueSubscription sub, QueueEntry entry)
+ private boolean assignMessage(QueueConsumer sub, QueueEntry entry)
{
Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
if(groupVal == null)
@@ -81,7 +81,7 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
else
{
Integer group = groupVal.hashCode() & _groupMask;
- QueueSubscription assignedSub = _groupMap.get(group);
+ QueueConsumer assignedSub = _groupMap.get(group);
if(assignedSub == sub)
{
return true;
@@ -105,7 +105,7 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
}
}
- public QueueEntry findEarliestAssignedAvailableEntry(QueueSubscription sub)
+ public QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer sub)
{
EntryFinder visitor = new EntryFinder(sub);
sub.getQueue().visit(visitor);
@@ -115,9 +115,9 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
private class EntryFinder implements QueueEntryVisitor
{
private QueueEntry _entry;
- private QueueSubscription _sub;
+ private QueueConsumer _sub;
- public EntryFinder(final QueueSubscription sub)
+ public EntryFinder(final QueueConsumer sub)
{
_sub = sub;
}
@@ -136,7 +136,7 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
}
Integer group = groupId.hashCode() & _groupMask;
- Subscription assignedSub = _groupMap.get(group);
+ Consumer assignedSub = _groupMap.get(group);
if(assignedSub == _sub)
{
_entry = entry;
@@ -154,9 +154,9 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
}
}
- public void clearAssignments(QueueSubscription sub)
+ public void clearAssignments(QueueConsumer sub)
{
- Iterator<QueueSubscription> subIter = _groupMap.values().iterator();
+ Iterator<QueueConsumer> subIter = _groupMap.values().iterator();
while(subIter.hasNext())
{
if(subIter.next() == sub)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
index 805bccd744..d5c03abc93 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.StateChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,23 +38,23 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
private final String _groupId;
private final String _defaultGroup;
private final Map<Object, Group> _groupMap = new HashMap<Object, Group>();
- private final SubscriptionResetHelper _resetHelper;
+ private final ConsumerResetHelper _resetHelper;
private final class Group
{
private final Object _group;
- private QueueSubscription _subscription;
+ private QueueConsumer _consumer;
private int _activeCount;
- private Group(final Object key, final QueueSubscription subscription)
+ private Group(final Object key, final QueueConsumer consumer)
{
_group = key;
- _subscription = subscription;
+ _consumer = consumer;
}
public boolean add()
{
- if(_subscription != null)
+ if(_consumer != null)
{
_activeCount++;
return true;
@@ -69,8 +69,8 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
{
if(--_activeCount == 0)
{
- _resetHelper.resetSubPointersForGroups(_subscription, false);
- _subscription = null;
+ _resetHelper.resetSubPointersForGroups(_consumer, false);
+ _consumer = null;
_groupMap.remove(_group);
}
}
@@ -100,12 +100,12 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
public boolean isValid()
{
- return !(_subscription == null || (_activeCount == 0 && _subscription.isClosed()));
+ return !(_consumer == null || (_activeCount == 0 && _consumer.isClosed()));
}
- public QueueSubscription getSubscription()
+ public QueueConsumer getConsumer()
{
- return _subscription;
+ return _consumer;
}
@Override
@@ -113,28 +113,28 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
{
return "Group{" +
"_group=" + _group +
- ", _subscription=" + _subscription +
+ ", _consumer=" + _consumer +
", _activeCount=" + _activeCount +
'}';
}
}
- public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, SubscriptionResetHelper resetHelper)
+ public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper resetHelper)
{
_groupId = groupId;
_defaultGroup = defaultGroup;
_resetHelper = resetHelper;
}
- public synchronized QueueSubscription getAssignedSubscription(final QueueEntry entry)
+ public synchronized QueueConsumer getAssignedConsumer(final QueueEntry entry)
{
Object groupId = getKey(entry);
Group group = _groupMap.get(groupId);
- return group == null || !group.isValid() ? null : group.getSubscription();
+ return group == null || !group.isValid() ? null : group.getConsumer();
}
- public synchronized boolean acceptMessage(final QueueSubscription sub, final QueueEntry entry)
+ public synchronized boolean acceptMessage(final QueueConsumer sub, final QueueEntry entry)
{
if(assignMessage(sub, entry))
{
@@ -146,7 +146,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
}
- private boolean assignMessage(final QueueSubscription sub, final QueueEntry entry)
+ private boolean assignMessage(final QueueConsumer sub, final QueueEntry entry)
{
Object groupId = getKey(entry);
Group group = _groupMap.get(groupId);
@@ -158,15 +158,15 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
_groupMap.put(groupId, group);
// there's a small change that the group became empty between the point at which getNextAvailable() was
- // called on the subscription, and when accept message is called... in that case we want to avoid delivering
+ // called on the consumer, and when accept message is called... in that case we want to avoid delivering
// out of order
- if(_resetHelper.isEntryAheadOfSubscription(entry, sub))
+ if(_resetHelper.isEntryAheadOfConsumer(entry, sub))
{
return false;
}
}
- Subscription assignedSub = group.getSubscription();
+ Consumer assignedSub = group.getConsumer();
if(assignedSub == sub)
{
@@ -179,7 +179,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
}
- public synchronized QueueEntry findEarliestAssignedAvailableEntry(final QueueSubscription sub)
+ public synchronized QueueEntry findEarliestAssignedAvailableEntry(final QueueConsumer sub)
{
EntryFinder visitor = new EntryFinder(sub);
sub.getQueue().visit(visitor);
@@ -189,9 +189,9 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
private class EntryFinder implements QueueEntryVisitor
{
private QueueEntry _entry;
- private QueueSubscription _sub;
+ private QueueConsumer _sub;
- public EntryFinder(final QueueSubscription sub)
+ public EntryFinder(final QueueConsumer sub)
{
_sub = sub;
}
@@ -206,7 +206,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
Object groupId = getKey(entry);
Group group = _groupMap.get(groupId);
- if(group != null && group.getSubscription() == _sub)
+ if(group != null && group.getConsumer() == _sub)
{
_entry = entry;
return true;
@@ -224,7 +224,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
- public void clearAssignments(final QueueSubscription sub)
+ public void clearAssignments(final QueueConsumer sub)
{
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
index 4ecddf6181..740a96bf2d 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
@@ -20,22 +20,20 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.subscription.Subscription;
-
public interface MessageGroupManager
{
- public interface SubscriptionResetHelper
+ public interface ConsumerResetHelper
{
- public void resetSubPointersForGroups(QueueSubscription subscription, boolean clearAssignments);
+ public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments);
- boolean isEntryAheadOfSubscription(QueueEntry entry, QueueSubscription sub);
+ boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub);
}
- QueueSubscription getAssignedSubscription(QueueEntry entry);
+ QueueConsumer getAssignedConsumer(QueueEntry entry);
- boolean acceptMessage(QueueSubscription sub, QueueEntry entry);
+ boolean acceptMessage(QueueConsumer sub, QueueEntry entry);
- QueueEntry findEarliestAssignedAvailableEntry(QueueSubscription sub);
+ QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer sub);
- void clearAssignments(QueueSubscription sub);
+ void clearAssignments(QueueConsumer sub);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
index 4f87e18e63..6918ae683c 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.queue.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
@@ -38,16 +36,16 @@ public abstract class OutOfOrderQueue extends SimpleAMQQueue
}
@Override
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
{
- // check that all subscriptions are not in advance of the entry
- SubscriptionList.SubscriptionNodeIterator subIter = getSubscriptionList().iterator();
+ // check that all consumers are not in advance of the entry
+ QueueConsumerList.ConsumerNodeIterator subIter = getConsumerList().iterator();
while(subIter.advance() && !entry.isAcquired())
{
- final QueueSubscription subscription = subIter.getNode().getSubscription();
- if(!subscription.isClosed())
+ final QueueConsumer consumer = subIter.getNode().getConsumer();
+ if(!consumer.isClosed())
{
- QueueContext context = subscription.getQueueContext();
+ QueueContext context = consumer.getQueueContext();
if(context != null)
{
QueueEntry released = context.getReleasedEntry();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index f683159e12..d8332f7306 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -33,8 +33,8 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.util.StateChangeListener;
import java.text.MessageFormat;
@@ -47,16 +47,16 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
-class QueueSubscription<T extends SubscriptionTarget> implements Subscription
+class QueueConsumer<T extends ConsumerTarget> implements Consumer
{
- private static final Logger _logger = Logger.getLogger(QueueSubscription.class);
+ private static final Logger _logger = Logger.getLogger(QueueConsumer.class);
private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
private final AtomicBoolean _closed = new AtomicBoolean(false);
- private final long _subscriptionID;
+ private final long _id;
private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
private final Lock _stateChangeLock = new ReentrantLock();
private final long _createTime = System.currentTimeMillis();
- private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+ private final MessageInstance.ConsumerAcquiredState _owningState = new MessageInstance.ConsumerAcquiredState(this);
private final boolean _acquires;
private final boolean _seesRequeues;
private final String _consumerName;
@@ -69,39 +69,39 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
private SimpleAMQQueue _queue;
private GenericActor _logActor;
- static final EnumMap<SubscriptionTarget.State, State> STATE_MAP =
- new EnumMap<SubscriptionTarget.State, State>(SubscriptionTarget.State.class);
+ static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
+ new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
static
{
- STATE_MAP.put(SubscriptionTarget.State.ACTIVE, State.ACTIVE);
- STATE_MAP.put(SubscriptionTarget.State.SUSPENDED, State.SUSPENDED);
- STATE_MAP.put(SubscriptionTarget.State.CLOSED, State.CLOSED);
+ STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE);
+ STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.SUSPENDED);
+ STATE_MAP.put(ConsumerTarget.State.CLOSED, State.CLOSED);
}
private final T _target;
private final SubFlushRunner _runner = new SubFlushRunner(this);
private volatile QueueContext _queueContext;
- private StateChangeListener<? extends Subscription, State> _stateListener = new StateChangeListener<Subscription, State>()
+ private StateChangeListener<? extends Consumer, State> _stateListener = new StateChangeListener<Consumer, State>()
{
- public void stateChanged(Subscription sub, State oldState, State newState)
+ public void stateChanged(Consumer sub, State oldState, State newState)
{
CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
}
};
private boolean _noLocal;
- QueueSubscription(final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
- final boolean acquires,
- final boolean seesRequeues,
- final String consumerName,
- final boolean isTransient,
- T target)
+ QueueConsumer(final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final boolean acquires,
+ final boolean seesRequeues,
+ final String consumerName,
+ final boolean isTransient,
+ T target)
{
_messageClass = messageClass;
_sessionReference = target.getSessionModel().getConnectionReference();
- _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
+ _id = SUB_ID_GENERATOR.getAndIncrement();
_filters = filters;
_acquires = acquires;
_seesRequeues = seesRequeues;
@@ -109,23 +109,23 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
_isTransient = isTransient;
_target = target;
_target.setStateListener(
- new StateChangeListener<SubscriptionTarget, SubscriptionTarget.State>()
+ new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
{
@Override
- public void stateChanged(final SubscriptionTarget object,
- final SubscriptionTarget.State oldState,
- final SubscriptionTarget.State newState)
+ public void stateChanged(final ConsumerTarget object,
+ final ConsumerTarget.State oldState,
+ final ConsumerTarget.State newState)
{
targetStateChanged(oldState, newState);
}
});
}
- private void targetStateChanged(final SubscriptionTarget.State oldState, final SubscriptionTarget.State newState)
+ private void targetStateChanged(final ConsumerTarget.State oldState, final ConsumerTarget.State newState)
{
if(oldState != newState)
{
- if(newState == SubscriptionTarget.State.CLOSED)
+ if(newState == ConsumerTarget.State.CLOSED)
{
if(_targetClosed.compareAndSet(false,true))
{
@@ -138,7 +138,7 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
}
}
- if(newState == SubscriptionTarget.State.CLOSED && oldState != newState && !_closed.get())
+ if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
{
try
{
@@ -146,12 +146,12 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
}
catch (AMQException e)
{
- _logger.error("Unable to remove to remove subscription", e);
+ _logger.error("Unable to remove to remove consumer", e);
throw new RuntimeException(e);
}
}
- final StateChangeListener<Subscription, State> stateListener =
- (StateChangeListener<Subscription, State>) getStateListener();
+ final StateChangeListener<Consumer, State> stateListener =
+ (StateChangeListener<Consumer, State>) getStateListener();
if(stateListener != null)
{
stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
@@ -202,8 +202,8 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
try
{
_target.close();
- _target.subscriptionRemoved(this);
- _queue.unregisterSubscription(this);
+ _target.consumerRemoved(this);
+ _queue.unregisterConsumer(this);
}
finally
{
@@ -258,13 +258,13 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
{
if(getQueue() != null)
{
- throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
+ throw new IllegalStateException("Attempt to set queue for consumer " + this + " to " + queue + "when already set to " + getQueue());
}
_queue = queue;
String queueString = new QueueLogSubject(_queue).toLogString();
- _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID())
+ _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
+ "("
// queueString is [vh(/{0})/qu({1}) ] so need to trim
// ^ ^^
@@ -295,7 +295,7 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
@Override
public final void flush() throws AMQException
{
- getQueue().flushSubscription(this);
+ getQueue().flushConsumer(this);
}
@Override
@@ -309,17 +309,17 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
return _runner;
}
- public final long getSubscriptionID()
+ public final long getId()
{
- return _subscriptionID;
+ return _id;
}
- public final StateChangeListener<? extends Subscription, State> getStateListener()
+ public final StateChangeListener<? extends Consumer, State> getStateListener()
{
return _stateListener;
}
- public final void setStateListener(StateChangeListener<? extends Subscription, State> listener)
+ public final void setStateListener(StateChangeListener<? extends Consumer, State> listener)
{
_stateListener = listener;
}
@@ -430,7 +430,7 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
return _createTime;
}
- public final MessageInstance.SubscriptionAcquiredState getOwningState()
+ public final MessageInstance.ConsumerAcquiredState getOwningState()
{
return _owningState;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubscriptionList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
index 7329cbb88b..82e9d58cf3 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubscriptionList.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
@@ -24,28 +24,28 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-class SubscriptionList
+class QueueConsumerList
{
- private final SubscriptionNode _head = new SubscriptionNode();
+ private final ConsumerNode _head = new ConsumerNode();
- private final AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
- private final AtomicReference<SubscriptionNode> _subNodeMarker = new AtomicReference<SubscriptionNode>(_head);
+ private final AtomicReference<ConsumerNode> _tail = new AtomicReference<ConsumerNode>(_head);
+ private final AtomicReference<ConsumerNode> _subNodeMarker = new AtomicReference<ConsumerNode>(_head);
private final AtomicInteger _size = new AtomicInteger();
- public static final class SubscriptionNode
+ public static final class ConsumerNode
{
private final AtomicBoolean _deleted = new AtomicBoolean();
- private final AtomicReference<SubscriptionNode> _next = new AtomicReference<SubscriptionNode>();
- private final QueueSubscription _sub;
+ private final AtomicReference<ConsumerNode> _next = new AtomicReference<ConsumerNode>();
+ private final QueueConsumer _sub;
- public SubscriptionNode()
+ public ConsumerNode()
{
//used for sentinel head and dummy node construction
_sub = null;
_deleted.set(true);
}
- public SubscriptionNode(final QueueSubscription sub)
+ public ConsumerNode(final QueueConsumer sub)
{
//used for regular node construction
_sub = sub;
@@ -57,12 +57,12 @@ class SubscriptionList
*
* @return the next non-deleted node, or null if none was found.
*/
- public SubscriptionNode findNext()
+ public ConsumerNode findNext()
{
- SubscriptionNode next = nextNode();
+ ConsumerNode next = nextNode();
while(next != null && next.isDeleted())
{
- final SubscriptionNode newNext = next.nextNode();
+ final ConsumerNode newNext = next.nextNode();
if(newNext != null)
{
//try to move our _next reference forward to the 'newNext'
@@ -86,7 +86,7 @@ class SubscriptionList
*
* @return the immediately next node in the structure, or null if at the tail.
*/
- protected SubscriptionNode nextNode()
+ protected ConsumerNode nextNode()
{
return _next.get();
}
@@ -94,10 +94,10 @@ class SubscriptionList
/**
* Used to initialise the 'next' reference. Will only succeed if the reference was not previously set.
*
- * @param node the SubscriptionNode to set as 'next'
+ * @param node the ConsumerNode to set as 'next'
* @return whether the operation succeeded
*/
- private boolean setNext(final SubscriptionNode node)
+ private boolean setNext(final ConsumerNode node)
{
return _next.compareAndSet(null, node);
}
@@ -112,18 +112,18 @@ class SubscriptionList
return _deleted.compareAndSet(false,true);
}
- public QueueSubscription getSubscription()
+ public QueueConsumer getConsumer()
{
return _sub;
}
}
- private void insert(final SubscriptionNode node, final boolean count)
+ private void insert(final ConsumerNode node, final boolean count)
{
for (;;)
{
- SubscriptionNode tail = _tail.get();
- SubscriptionNode next = tail.nextNode();
+ ConsumerNode tail = _tail.get();
+ ConsumerNode next = tail.nextNode();
if (tail == _tail.get())
{
if (next == null)
@@ -146,35 +146,35 @@ class SubscriptionList
}
}
- public void add(final QueueSubscription sub)
+ public void add(final QueueConsumer sub)
{
- SubscriptionNode node = new SubscriptionNode(sub);
+ ConsumerNode node = new ConsumerNode(sub);
insert(node, true);
}
- public boolean remove(final QueueSubscription sub)
+ public boolean remove(final QueueConsumer sub)
{
- SubscriptionNode prevNode = _head;
- SubscriptionNode node = _head.nextNode();
+ ConsumerNode prevNode = _head;
+ ConsumerNode node = _head.nextNode();
while(node != null)
{
- if(sub.equals(node.getSubscription()) && node.delete())
+ if(sub.equals(node.getConsumer()) && node.delete())
{
_size.decrementAndGet();
- SubscriptionNode tail = _tail.get();
+ ConsumerNode tail = _tail.get();
if(node == tail)
{
//we cant remove the last node from the structure for
//correctness reasons, however we have just 'deleted'
//the tail. Inserting an empty dummy node after it will
- //let us scavenge the node containing the Subscription.
- insert(new SubscriptionNode(), false);
+ //let us scavenge the node containing the Consumer.
+ insert(new ConsumerNode(), false);
}
//advance the next node reference in the 'prevNode' to scavenge
- //the newly 'deleted' node for the Subscription.
+ //the newly 'deleted' node for the Consumer.
prevNode.findNext();
nodeMarkerCleanup(node);
@@ -189,9 +189,9 @@ class SubscriptionList
return false;
}
- private void nodeMarkerCleanup(final SubscriptionNode node)
+ private void nodeMarkerCleanup(final ConsumerNode node)
{
- SubscriptionNode markedNode = _subNodeMarker.get();
+ ConsumerNode markedNode = _subNodeMarker.get();
if(node == markedNode)
{
//if the marked node is the one we are removing, then
@@ -200,7 +200,7 @@ class SubscriptionList
//into the list and find the next node to use.
//Because we inserted a dummy if node was the
//tail, markedNode.nextNode() can never be null.
- SubscriptionNode dummy = new SubscriptionNode();
+ ConsumerNode dummy = new ConsumerNode();
dummy.setNext(markedNode.nextNode());
//if the CAS fails the marked node has changed, thus
@@ -219,53 +219,53 @@ class SubscriptionList
}
}
- public boolean updateMarkedNode(final SubscriptionNode expected, final SubscriptionNode nextNode)
+ public boolean updateMarkedNode(final ConsumerNode expected, final ConsumerNode nextNode)
{
return _subNodeMarker.compareAndSet(expected, nextNode);
}
/**
- * Get the current marked SubscriptionNode. This should only be used only to index into the list and find the next node
+ * Get the current marked ConsumerNode. This should only be used only to index into the list and find the next node
* after the mark, since if the previously marked node was subsequently deleted the item returned may be a dummy node
* with reference to the next node.
*
* @return the previously marked node (or a dummy if it was subsequently deleted)
*/
- public SubscriptionNode getMarkedNode()
+ public ConsumerNode getMarkedNode()
{
return _subNodeMarker.get();
}
- public static class SubscriptionNodeIterator
+ public static class ConsumerNodeIterator
{
- private SubscriptionNode _lastNode;
+ private ConsumerNode _lastNode;
- SubscriptionNodeIterator(SubscriptionNode startNode)
+ ConsumerNodeIterator(ConsumerNode startNode)
{
_lastNode = startNode;
}
- public SubscriptionNode getNode()
+ public ConsumerNode getNode()
{
return _lastNode;
}
public boolean advance()
{
- SubscriptionNode nextNode = _lastNode.findNext();
+ ConsumerNode nextNode = _lastNode.findNext();
_lastNode = nextNode;
return _lastNode != null;
}
}
- public SubscriptionNodeIterator iterator()
+ public ConsumerNodeIterator iterator()
{
- return new SubscriptionNodeIterator(_head);
+ return new ConsumerNodeIterator(_head);
}
- public SubscriptionNode getHead()
+ public ConsumerNode getHead()
{
return _head;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 61f38f963c..d984cf8ab4 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -20,13 +20,7 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.StateChangeListener;
public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index ea50337b61..7908cc9de7 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -29,7 +29,7 @@ import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -186,7 +186,7 @@ public abstract class QueueEntryImpl implements QueueEntry
public boolean acquire()
{
- return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
+ return acquire(NON_CONSUMER_ACQUIRED_STATE);
}
private boolean acquire(final EntryState state)
@@ -201,7 +201,7 @@ public abstract class QueueEntryImpl implements QueueEntry
return acquired;
}
- public boolean acquire(Subscription sub)
+ public boolean acquire(Consumer sub)
{
final boolean acquired = acquire(sub.getOwningState());
if(acquired)
@@ -211,17 +211,17 @@ public abstract class QueueEntryImpl implements QueueEntry
return acquired;
}
- public boolean acquiredBySubscription()
+ public boolean acquiredByConsumer()
{
- return (_state instanceof SubscriptionAcquiredState);
+ return (_state instanceof ConsumerAcquiredState);
}
- public boolean isAcquiredBy(Subscription subscription)
+ public boolean isAcquiredBy(Consumer consumer)
{
EntryState state = _state;
- return state instanceof SubscriptionAcquiredState
- && ((SubscriptionAcquiredState)state).getSubscription() == subscription;
+ return state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer;
}
public void release()
@@ -231,7 +231,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
- if(state instanceof SubscriptionAcquiredState)
+ if(state instanceof ConsumerAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
@@ -263,12 +263,12 @@ public abstract class QueueEntryImpl implements QueueEntry
return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED));
}
- public Subscription getDeliveredSubscription()
+ public Consumer getDeliveredConsumer()
{
EntryState state = _state;
- if (state instanceof SubscriptionAcquiredState)
+ if (state instanceof ConsumerAcquiredState)
{
- return ((SubscriptionAcquiredState) state).getSubscription();
+ return ((ConsumerAcquiredState) state).getConsumer();
}
else
{
@@ -278,16 +278,16 @@ public abstract class QueueEntryImpl implements QueueEntry
public void reject()
{
- Subscription subscription = getDeliveredSubscription();
+ Consumer consumer = getDeliveredConsumer();
- if (subscription != null)
+ if (consumer != null)
{
if (_rejectedBy == null)
{
_rejectedBy = new HashSet<Long>();
}
- _rejectedBy.add(subscription.getSubscriptionID());
+ _rejectedBy.add(consumer.getId());
}
else
{
@@ -295,12 +295,12 @@ public abstract class QueueEntryImpl implements QueueEntry
}
}
- public boolean isRejectedBy(Subscription subscription)
+ public boolean isRejectedBy(Consumer consumer)
{
- if (_rejectedBy != null) // We have subscriptions that rejected this message
+ if (_rejectedBy != null) // We have consumers that rejected this message
{
- return _rejectedBy.contains(subscription.getSubscriptionID());
+ return _rejectedBy.contains(consumer.getId());
}
else // This message hasn't been rejected yet.
{
@@ -314,8 +314,8 @@ public abstract class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- Subscription s = null;
- if (state instanceof SubscriptionAcquiredState)
+ Consumer s = null;
+ if (state instanceof ConsumerAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
@@ -490,7 +490,7 @@ public abstract class QueueEntryImpl implements QueueEntry
@Override
public boolean resend() throws AMQException
{
- Subscription sub = getDeliveredSubscription();
+ Consumer sub = getDeliveredConsumer();
if(sub != null)
{
return sub.resend(this);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
index 22a2029494..005d9b66b3 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -32,8 +32,8 @@ import org.apache.qpid.transport.TransportException;
/**
* QueueRunners are Runnables used to process a queue when requiring
- * asynchronous message delivery to subscriptions, which is necessary
- * when straight-through delivery of a message to a subscription isn't
+ * asynchronous message delivery to consumers, which is necessary
+ * when straight-through delivery of a message to a consumer isn't
* possible during the enqueue operation.
*/
public class QueueRunner implements Runnable
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 36dc04a5fc..78585997be 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -46,8 +46,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -56,8 +56,8 @@ import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class SimpleAMQQueue implements AMQQueue,
- StateChangeListener<QueueSubscription, Subscription.State>,
- MessageGroupManager.SubscriptionResetHelper
+ StateChangeListener<QueueConsumer, Consumer.State>,
+ MessageGroupManager.ConsumerResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -92,9 +92,9 @@ public class SimpleAMQQueue implements AMQQueue,
private final QueueEntryList<QueueEntry> _entries;
- private final SubscriptionList _subscriptionList = new SubscriptionList();
+ private final QueueConsumerList _consumerList = new QueueConsumerList();
- private volatile QueueSubscription _exclusiveSubscriber;
+ private volatile QueueConsumer _exclusiveSubscriber;
@@ -177,8 +177,8 @@ public class SimpleAMQQueue implements AMQQueue,
private int _maximumDeliveryCount;
private final MessageGroupManager _messageGroupManager;
- private final Collection<SubscriptionRegistrationListener> _subscriptionListeners =
- new ArrayList<SubscriptionRegistrationListener>();
+ private final Collection<ConsumerRegistrationListener> _consumerListeners =
+ new ArrayList<ConsumerRegistrationListener>();
private AMQQueue.NotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
@@ -247,7 +247,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
else
{
- _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(
+ _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(arguments.get(
Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
@@ -378,15 +378,15 @@ public class SimpleAMQQueue implements AMQQueue,
return _name;
}
- // ------ Manage Subscriptions
+ // ------ Manage Consumers
@Override
- public QueueSubscription registerSubscription(final SubscriptionTarget target,
- final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
- final String consumerName,
- EnumSet<Subscription.Option> optionSet) throws AMQException
+ public QueueConsumer addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ EnumSet<Consumer.Option> optionSet) throws AMQException
{
// Access control
@@ -396,59 +396,59 @@ public class SimpleAMQQueue implements AMQQueue,
}
- if (hasExclusiveSubscriber())
+ if (hasExclusiveConsumer())
{
- throw new ExistingExclusiveSubscription();
+ throw new ExistingExclusiveConsumer();
}
- boolean exclusive = optionSet.contains(Subscription.Option.EXCLUSIVE);
- boolean isTransient = optionSet.contains(Subscription.Option.TRANSIENT);
+ boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE);
+ boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT);
if (exclusive && !isTransient && getConsumerCount() != 0)
{
- throw new ExistingSubscriptionPreventsExclusive();
+ throw new ExistingConsumerPreventsExclusive();
}
- QueueSubscription subscription = new QueueSubscription(filters, messageClass,
- optionSet.contains(Subscription.Option.ACQUIRES),
- optionSet.contains(Subscription.Option.SEES_REQUEUES),
- consumerName, optionSet.contains(Subscription.Option.TRANSIENT), target);
- target.subscriptionRegistered(subscription);
+ QueueConsumer consumer = new QueueConsumer(filters, messageClass,
+ optionSet.contains(Consumer.Option.ACQUIRES),
+ optionSet.contains(Consumer.Option.SEES_REQUEUES),
+ consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
+ target.consumerAdded(consumer);
if (exclusive && !isTransient)
{
- _exclusiveSubscriber = subscription;
+ _exclusiveSubscriber = consumer;
}
- if(subscription.isActive())
+ if(consumer.isActive())
{
_activeSubscriberCount.incrementAndGet();
}
- subscription.setStateListener(this);
- subscription.setQueueContext(new QueueContext(_entries.getHead()));
+ consumer.setStateListener(this);
+ consumer.setQueueContext(new QueueContext(_entries.getHead()));
if (!isDeleted())
{
- subscription.setQueue(this, exclusive);
+ consumer.setQueue(this, exclusive);
if(_nolocal)
{
- subscription.setNoLocal(_nolocal);
+ consumer.setNoLocal(_nolocal);
}
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ for(ConsumerRegistrationListener listener : _consumerListeners)
{
- listener.subscriptionRegistered(this, subscription);
+ listener.consumerAdded(this, consumer);
}
}
- _subscriptionList.add(subscription);
+ _consumerList.add(consumer);
- //Increment consumerCountHigh if necessary. (un)registerSubscription are both
+ //Increment consumerCountHigh if necessary. (un)registerConsumer are both
//synchronized methods so we don't need additional synchronization here
if(_consumerCountHigh.get() < getConsumerCount())
{
@@ -457,7 +457,7 @@ public class SimpleAMQQueue implements AMQQueue,
if (isDeleted())
{
- subscription.queueDeleted();
+ consumer.queueDeleted();
}
}
else
@@ -465,27 +465,27 @@ public class SimpleAMQQueue implements AMQQueue,
// TODO
}
- deliverAsync(subscription);
+ deliverAsync(consumer);
- return subscription;
+ return consumer;
}
- synchronized void unregisterSubscription(final QueueSubscription subscription) throws AMQException
+ synchronized void unregisterConsumer(final QueueConsumer consumer) throws AMQException
{
- if (subscription == null)
+ if (consumer == null)
{
- throw new NullPointerException("subscription argument is null");
+ throw new NullPointerException("consumer argument is null");
}
- boolean removed = _subscriptionList.remove(subscription);
+ boolean removed = _consumerList.remove(consumer);
if (removed)
{
- subscription.close();
+ consumer.close();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
- subscription.setQueueContext(null);
+ consumer.setQueueContext(null);
if(!isDeleted() && isExclusive() && getConsumerCount() == 0)
{
@@ -494,20 +494,20 @@ public class SimpleAMQQueue implements AMQQueue,
if(_messageGroupManager != null)
{
- resetSubPointersForGroups(subscription, true);
+ resetSubPointersForGroups(consumer, true);
}
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ for(ConsumerRegistrationListener listener : _consumerListeners)
{
- listener.subscriptionUnregistered(this, subscription);
+ listener.consumerRemoved(this, consumer);
}
}
// auto-delete queues must be deleted if there are no remaining subscribers
- if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
+ if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 )
{
if (_logger.isInfoEnabled())
{
@@ -516,57 +516,57 @@ public class SimpleAMQQueue implements AMQQueue,
getVirtualHost().removeQueue(this);
- // we need to manually fire the event to the removed subscription (which was the last one left for this
- // queue. This is because the delete method uses the subscription set which has just been cleared
- subscription.queueDeleted();
+ // we need to manually fire the event to the removed consumer (which was the last one left for this
+ // queue. This is because the delete method uses the consumer set which has just been cleared
+ consumer.queueDeleted();
}
}
}
- public Collection<Subscription> getConsumers()
+ public Collection<Consumer> getConsumers()
{
- List<Subscription> consumers = new ArrayList<Subscription>();
- SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator();
+ List<Consumer> consumers = new ArrayList<Consumer>();
+ QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator();
while(iter.advance())
{
- consumers.add(iter.getNode().getSubscription());
+ consumers.add(iter.getNode().getConsumer());
}
return consumers;
}
- public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- _subscriptionListeners.add(listener);
+ _consumerListeners.add(listener);
}
}
- public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- _subscriptionListeners.remove(listener);
+ _consumerListeners.remove(listener);
}
}
- public void resetSubPointersForGroups(QueueSubscription subscription, boolean clearAssignments)
+ public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments)
{
- QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
if(clearAssignments)
{
- _messageGroupManager.clearAssignments(subscription);
+ _messageGroupManager.clearAssignments(consumer);
}
if(entry != null)
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance())
{
- QueueSubscription sub = subscriberIter.getNode().getSubscription();
+ QueueConsumer sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -644,35 +644,35 @@ public class SimpleAMQQueue implements AMQQueue,
QueueEntry entry;
- final QueueSubscription exclusiveSub = _exclusiveSubscriber;
+ final QueueConsumer exclusiveSub = _exclusiveSubscriber;
entry = _entries.add(message);
if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
{
/*
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+ iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message
*/
- SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
- SubscriptionList.SubscriptionNode nextNode = node.findNext();
+ QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode();
+ QueueConsumerList.ConsumerNode nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _consumerList.getHead().findNext();
}
while (nextNode != null)
{
- if (_subscriptionList.updateMarkedNode(node, nextNode))
+ if (_consumerList.updateMarkedNode(node, nextNode))
{
break;
}
else
{
- node = _subscriptionList.getMarkedNode();
+ node = _consumerList.getMarkedNode();
nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _consumerList.getHead().findNext();
}
}
}
@@ -686,13 +686,13 @@ public class SimpleAMQQueue implements AMQQueue,
if (nextNode == null)
{
loops--;
- nextNode = _subscriptionList.getHead();
+ nextNode = _consumerList.getHead();
}
else
{
- // if subscription at end, and active, offer
- QueueSubscription sub = nextNode.getSubscription();
- deliverToSubscription(sub, entry);
+ // if consumer at end, and active, offer
+ QueueConsumer sub = nextNode.getConsumer();
+ deliverToConsumer(sub, entry);
}
nextNode = nextNode.findNext();
@@ -702,7 +702,7 @@ public class SimpleAMQQueue implements AMQQueue,
if (entry.isAvailable())
{
- checkSubscriptionsNotAheadOfDelivery(entry);
+ checkConsumersNotAheadOfDelivery(entry);
if (exclusiveSub != null)
{
@@ -723,7 +723,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
- private void deliverToSubscription(final QueueSubscription sub, final QueueEntry entry)
+ private void deliverToConsumer(final QueueConsumer sub, final QueueEntry entry)
throws AMQException
{
@@ -732,14 +732,14 @@ public class SimpleAMQQueue implements AMQQueue,
try
{
if (!sub.isSuspended()
- && subscriptionReadyAndHasInterest(sub, entry)
+ && consumerReadyAndHasInterest(sub, entry)
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
if (sub.acquires() && !assign(sub, entry))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
+ // to acquire the entry for this consumer
sub.restoreCredit(entry);
}
else
@@ -755,7 +755,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
}
- private boolean assign(final QueueSubscription sub, final QueueEntry entry)
+ private boolean assign(final QueueConsumer sub, final QueueEntry entry)
{
if(_messageGroupManager == null)
{
@@ -769,17 +769,17 @@ public class SimpleAMQQueue implements AMQQueue,
}
}
- private boolean mightAssign(final QueueSubscription sub, final QueueEntry entry)
+ private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry)
{
if(_messageGroupManager == null || !sub.acquires())
{
return true;
}
- QueueSubscription assigned = _messageGroupManager.getAssignedSubscription(entry);
+ QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
return (assigned == null) || (assigned == sub);
}
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
// Simple Queues don't :-)
@@ -813,7 +813,7 @@ public class SimpleAMQQueue implements AMQQueue,
getAtomicQueueCount().incrementAndGet();
}
- private void deliverMessage(final QueueSubscription sub, final QueueEntry entry, boolean batch)
+ private void deliverMessage(final QueueConsumer sub, final QueueEntry entry, boolean batch)
throws AMQException
{
setLastSeenEntry(sub, entry);
@@ -824,13 +824,13 @@ public class SimpleAMQQueue implements AMQQueue,
sub.send(entry, batch);
}
- private boolean subscriptionReadyAndHasInterest(final QueueSubscription sub, final QueueEntry entry) throws AMQException
+ private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final QueueEntry entry) throws AMQException
{
return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
}
- private void setLastSeenEntry(final QueueSubscription sub, final QueueEntry entry)
+ private void setLastSeenEntry(final QueueConsumer sub, final QueueEntry entry)
{
QueueContext subContext = sub.getQueueContext();
if (subContext != null)
@@ -845,7 +845,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
}
- private void updateSubRequeueEntry(final QueueSubscription sub, final QueueEntry entry)
+ private void updateSubRequeueEntry(final QueueConsumer sub, final QueueEntry entry)
{
QueueContext subContext = sub.getQueueContext();
@@ -865,11 +865,11 @@ public class SimpleAMQQueue implements AMQQueue,
public void requeue(QueueEntry entry)
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance() && entry.isAvailable())
{
- QueueSubscription sub = subscriberIter.getNode().getSubscription();
+ QueueConsumer sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -882,11 +882,11 @@ public class SimpleAMQQueue implements AMQQueue,
}
- public void dequeue(QueueEntry entry, Subscription sub)
+ public void dequeue(QueueEntry entry, Consumer sub)
{
decrementQueueCount();
decrementQueueSize(entry);
- if (entry.acquiredBySubscription())
+ if (entry.acquiredByConsumer())
{
_deliveredMessages.decrementAndGet();
}
@@ -914,17 +914,17 @@ public class SimpleAMQQueue implements AMQQueue,
_dequeueCount.incrementAndGet();
}
- public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
+ public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException
{
- /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
- entry to resend and move back the subscription pointer. */
+ /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
+ entry to resend and move back the consumer pointer. */
- subscription.getSendLock();
+ consumer.getSendLock();
try
{
- if (!subscription.isClosed())
+ if (!consumer.isClosed())
{
- deliverMessage((QueueSubscription)subscription, entry, false);
+ deliverMessage((QueueConsumer) consumer, entry, false);
return true;
}
else
@@ -934,7 +934,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
finally
{
- subscription.releaseSendLock();
+ consumer.releaseSendLock();
}
}
@@ -942,7 +942,7 @@ public class SimpleAMQQueue implements AMQQueue,
public int getConsumerCount()
{
- return _subscriptionList.size();
+ return _consumerList.size();
}
public int getConsumerCountHigh()
@@ -1025,16 +1025,16 @@ public class SimpleAMQQueue implements AMQQueue,
}
- public void stateChanged(QueueSubscription sub, Subscription.State oldState, Subscription.State newState)
+ public void stateChanged(QueueConsumer sub, Consumer.State oldState, Consumer.State newState)
{
- if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
+ if (oldState == Consumer.State.ACTIVE && newState != Consumer.State.ACTIVE)
{
_activeSubscriberCount.decrementAndGet();
}
- else if (newState == Subscription.State.ACTIVE)
+ else if (newState == Consumer.State.ACTIVE)
{
- if (oldState != Subscription.State.ACTIVE)
+ if (oldState != Consumer.State.ACTIVE)
{
_activeSubscriberCount.incrementAndGet();
@@ -1058,12 +1058,12 @@ public class SimpleAMQQueue implements AMQQueue,
return _atomicQueueSize;
}
- public boolean hasExclusiveSubscriber()
+ public boolean hasExclusiveConsumer()
{
return _exclusiveSubscriber != null;
}
- private void setExclusiveSubscriber(QueueSubscription exclusiveSubscriber)
+ private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber)
{
_exclusiveSubscriber = exclusiveSubscriber;
}
@@ -1079,9 +1079,9 @@ public class SimpleAMQQueue implements AMQQueue,
return _entries;
}
- protected SubscriptionList getSubscriptionList()
+ protected QueueConsumerList getConsumerList()
{
- return _subscriptionList;
+ return _consumerList;
}
@@ -1315,11 +1315,11 @@ public class SimpleAMQQueue implements AMQQueue,
b.getExchange().removeBinding(b);
}
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
- while (subscriptionIter.advance())
+ while (consumerNodeIterator.advance())
{
- QueueSubscription s = subscriptionIter.getNode().getSubscription();
+ QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
if (s != null)
{
s.queueDeleted();
@@ -1449,7 +1449,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
- public void deliverAsync(QueueSubscription sub)
+ public void deliverAsync(QueueConsumer sub)
{
if(_exclusiveSubscriber == null)
{
@@ -1463,17 +1463,17 @@ public class SimpleAMQQueue implements AMQQueue,
}
- public void flushSubscription(Subscription sub) throws AMQException
+ public void flushConsumer(Consumer sub) throws AMQException
{
// Access control
if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
{
throw new AMQSecurityException("Permission denied: " + getName());
}
- flushSubscription(sub, Long.MAX_VALUE);
+ flushConsumer(sub, Long.MAX_VALUE);
}
- public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
+ public boolean flushConsumer(Consumer sub, long iterations) throws AMQException
{
boolean atTail = false;
final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
@@ -1494,8 +1494,8 @@ public class SimpleAMQQueue implements AMQQueue,
sub.getSendLock();
}
- atTail = attemptDelivery((QueueSubscription)sub, true);
- if (atTail && getNextAvailableEntry((QueueSubscription)sub) == null)
+ atTail = attemptDelivery((QueueConsumer)sub, true);
+ if (atTail && getNextAvailableEntry((QueueConsumer)sub) == null)
{
queueEmpty = true;
}
@@ -1529,21 +1529,21 @@ public class SimpleAMQQueue implements AMQQueue,
}
- // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
+ // if there's (potentially) more than one consumer the others will potentially not have been advanced to the
// next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
// which would give us memory "leak".
- if (!hasExclusiveSubscriber())
+ if (!hasExclusiveConsumer())
{
- advanceAllSubscriptions();
+ advanceAllConsumers();
}
return atTail;
}
/**
- * Attempt delivery for the given subscription.
+ * Attempt delivery for the given consumer.
*
- * Looks up the next node for the subscription and attempts to deliver it.
+ * Looks up the next node for the consumer and attempts to deliver it.
*
*
* @param sub
@@ -1551,7 +1551,7 @@ public class SimpleAMQQueue implements AMQQueue,
* @return true if we have completed all possible deliveries for this sub.
* @throws AMQException
*/
- private boolean attemptDelivery(QueueSubscription sub, boolean batch) throws AMQException
+ private boolean attemptDelivery(QueueConsumer sub, boolean batch) throws AMQException
{
boolean atTail = false;
@@ -1570,7 +1570,7 @@ public class SimpleAMQQueue implements AMQQueue,
if (sub.acquires() && !assign(sub, node))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
+ // to acquire the entry for this consumer
sub.restoreCredit(node);
}
else
@@ -1581,7 +1581,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
else // Not enough Credit for message and wouldSuspend
{
- //QPID-1187 - Treat the subscription as suspended for this message
+ //QPID-1187 - Treat the consumer as suspended for this message
// and wait for the message to be removed to continue delivery.
subActive = false;
node.addStateChangeListener(new QueueEntryListener(sub));
@@ -1594,13 +1594,13 @@ public class SimpleAMQQueue implements AMQQueue,
return atTail || !subActive;
}
- protected void advanceAllSubscriptions() throws AMQException
+ protected void advanceAllConsumers() throws AMQException
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- while (subscriberIter.advance())
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+ while (consumerNodeIterator.advance())
{
- SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
- QueueSubscription sub = subNode.getSubscription();
+ QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode();
+ QueueConsumer sub = subNode.getConsumer();
if(sub.acquires())
{
getNextAvailableEntry(sub);
@@ -1612,7 +1612,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
}
- private QueueEntry getNextAvailableEntry(final QueueSubscription sub)
+ private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
throws AMQException
{
QueueContext context = sub.getQueueContext();
@@ -1653,7 +1653,7 @@ public class SimpleAMQQueue implements AMQQueue,
}
}
- public boolean isEntryAheadOfSubscription(QueueEntry entry, QueueSubscription sub)
+ public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub)
{
QueueContext context = sub.getQueueContext();
if(context != null)
@@ -1672,14 +1672,14 @@ public class SimpleAMQQueue implements AMQQueue,
*
* A queue Runner is started whenever a state change occurs, e.g when a new
* message arrives on the queue and cannot be immediately delivered to a
- * subscription (i.e. asynchronous delivery is required). Unless there are
- * SubFlushRunners operating (due to subscriptions unsuspending) which are
+ * consumer (i.e. asynchronous delivery is required). Unless there are
+ * SubFlushRunners operating (due to consumers unsuspending) which are
* capable of accepting/delivering all messages then these messages would
* otherwise remain on the queue.
*
* processQueue should be running while there are messages on the queue AND
- * there are subscriptions that can deliver them. If there are no
- * subscriptions capable of delivering the remaining messages on the queue
+ * there are consumers that can deliver them. If there are no
+ * consumers capable of delivering the remaining messages on the queue
* then processQueue should stop to prevent spinning.
*
* Since processQueue is runs in a fixed size Executor, it should not run
@@ -1703,7 +1703,7 @@ public class SimpleAMQQueue implements AMQQueue,
boolean lastLoop = false;
int iterations = MAX_ASYNC_DELIVERIES;
- final int numSubs = _subscriptionList.size();
+ final int numSubs = _consumerList.size();
final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
@@ -1714,8 +1714,8 @@ public class SimpleAMQQueue implements AMQQueue,
// So whilst delivery/rejection is going on a processQueue thread will be running
while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
{
- // we want to have one extra loop after every subscription has reached the point where it cannot move
- // further, just in case the advance of one subscription in the last loop allows a different subscription to
+ // we want to have one extra loop after every consumer has reached the point where it cannot move
+ // further, just in case the advance of one consumer in the last loop allows a different consumer to
// move forward in the next iteration
if (previousStateChangeCount != stateChangeCount)
@@ -1727,14 +1727,14 @@ public class SimpleAMQQueue implements AMQQueue,
}
previousStateChangeCount = stateChangeCount;
- boolean allSubscriptionsDone = true;
- boolean subscriptionDone;
+ boolean allConsumersDone = true;
+ boolean consumerDone;
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
//iterate over the subscribers and try to advance their pointer
- while (subscriptionIter.advance())
+ while (consumerNodeIterator.advance())
{
- QueueSubscription sub = subscriptionIter.getNode().getSubscription();
+ QueueConsumer sub = consumerNodeIterator.getNode().getConsumer();
sub.getSendLock();
try
@@ -1742,8 +1742,8 @@ public class SimpleAMQQueue implements AMQQueue,
for(int i = 0 ; i < perSub; i++)
{
//attempt delivery. returns true if no further delivery currently possible to this sub
- subscriptionDone = attemptDelivery(sub, true);
- if (subscriptionDone)
+ consumerDone = attemptDelivery(sub, true);
+ if (consumerDone)
{
sub.flushBatched();
if (lastLoop && !sub.isSuspended())
@@ -1754,9 +1754,9 @@ public class SimpleAMQQueue implements AMQQueue,
}
else
{
- //this subscription can accept additional deliveries, so we must
+ //this consumer can accept additional deliveries, so we must
//keep going after this (if iteration slicing allows it)
- allSubscriptionsDone = false;
+ allConsumersDone = false;
lastLoop = false;
if(--iterations == 0)
{
@@ -1775,24 +1775,24 @@ public class SimpleAMQQueue implements AMQQueue,
}
}
- if(allSubscriptionsDone && lastLoop)
+ if(allConsumersDone && lastLoop)
{
//We have done an extra loop already and there are again
//again no further delivery attempts possible, only
//keep going if state change demands it.
deliveryIncomplete = false;
}
- else if(allSubscriptionsDone)
+ else if(allConsumersDone)
{
- //All subscriptions reported being done, but we have to do
+ //All consumers reported being done, but we have to do
//an extra loop if the iterations are not exhausted and
//there is still any work to be done
- deliveryIncomplete = _subscriptionList.size() != 0;
+ deliveryIncomplete = _consumerList.size() != 0;
lastLoop = true;
}
else
{
- //some subscriptions can still accept more messages,
+ //some consumers can still accept more messages,
//keep going if iteration count allows.
lastLoop = false;
deliveryIncomplete = true;
@@ -1970,9 +1970,9 @@ public class SimpleAMQQueue implements AMQQueue,
private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State>
{
- private final QueueSubscription _sub;
+ private final QueueConsumer _sub;
- public QueueEntryListener(final QueueSubscription sub)
+ public QueueEntryListener(final QueueConsumer sub)
{
_sub = sub;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index 6c63b30273..44bda5182a 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -30,7 +30,7 @@ import java.util.UUID;
public class SortedQueue extends OutOfOrderQueue
{
//Lock object to synchronize enqueue. Used instead of the object
- //monitor to prevent lock order issues with subscription sendLocks
+ //monitor to prevent lock order issues with consumer sendLocks
//and consumer updates in the super classes
private final Object _sortedQueueLock = new Object();
private final String _sortedPropertyName;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
index 54d63afe6f..c30a48b03a 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
@@ -25,7 +25,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.transport.TransportException;
import java.util.concurrent.Executor;
@@ -38,7 +37,7 @@ class SubFlushRunner implements Runnable
private static final Logger _logger = Logger.getLogger(SubFlushRunner.class);
- private final QueueSubscription _sub;
+ private final QueueConsumer _sub;
private static int IDLE = 0;
private static int SCHEDULED = 1;
@@ -51,7 +50,7 @@ class SubFlushRunner implements Runnable
private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
private final AtomicBoolean _stateChange = new AtomicBoolean();
- public SubFlushRunner(QueueSubscription sub)
+ public SubFlushRunner(QueueConsumer sub)
{
_sub = sub;
}
@@ -65,7 +64,7 @@ class SubFlushRunner implements Runnable
try
{
CurrentActor.set(_sub.getLogActor());
- complete = getQueue().flushSubscription(_sub, ITERATIONS);
+ complete = getQueue().flushConsumer(_sub, ITERATIONS);
}
catch (AMQException e)
{
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 2ee72b9a36..a9b99503ec 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -19,7 +19,7 @@
*
*/
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.consumer;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -35,7 +35,6 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.StateChangeListener;
@@ -46,29 +45,26 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-public class MockSubscription implements SubscriptionTarget
+public class MockConsumer implements ConsumerTarget
{
private final List<String> _messageIds;
private boolean _closed = false;
private String tag = "mocktag";
private AMQQueue queue = null;
- private StateChangeListener<SubscriptionTarget, State> _listener = null;
+ private StateChangeListener<ConsumerTarget, State> _listener = null;
private State _state = State.ACTIVE;
private ArrayList<MessageInstance> messages = new ArrayList<MessageInstance>();
private final Lock _stateChangeLock = new ReentrantLock();
- private static final AtomicLong idGenerator = new AtomicLong(0);
- // Create a simple ID that increments for ever new Subscription
private boolean _isActive = true;
- private Subscription _subscription;
- public MockSubscription()
+ public MockConsumer()
{
_messageIds = null;
}
- public MockSubscription(List<String> messageIds)
+ public MockConsumer(List<String> messageIds)
{
_messageIds = messageIds;
}
@@ -177,13 +173,12 @@ public class MockSubscription implements SubscriptionTarget
}
@Override
- public void subscriptionRegistered(final Subscription sub)
+ public void consumerAdded(final Consumer sub)
{
- _subscription = sub;
}
@Override
- public void subscriptionRemoved(final Subscription sub)
+ public void consumerRemoved(final Consumer sub)
{
}
@@ -199,7 +194,7 @@ public class MockSubscription implements SubscriptionTarget
}
@Override
- public void setStateListener(final StateChangeListener<SubscriptionTarget, State> listener)
+ public void setStateListener(final StateChangeListener<ConsumerTarget, State> listener)
{
_listener = listener;
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java
index b2bc351f8f..52a53e8d38 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java
@@ -25,7 +25,7 @@ import java.util.List;
/**
* Test SUB Log Messages
*/
-public class SubscriptionMessagesTest extends AbstractTestMessages
+public class ConsumerMessagesTest extends AbstractTestMessages
{
public void testSubscriptionCreateALL()
{
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java
index 17d68ef7c3..c1068b4a0b 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java
@@ -38,7 +38,7 @@ public class VirtualHostMessagesTest extends AbstractTestMessages
validateLogMessage(log, "VHT-1001", expected);
}
- public void testSubscriptionClosed()
+ public void testVirtualhostClosed()
{
_logMessage = VirtualHostMessages.CLOSED();
List<Object> log = performLog();
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index 0d38b7002a..7092b50c52 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -32,7 +32,7 @@ import java.util.ArrayList;
import java.util.EnumSet;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import static org.mockito.Mockito.when;
@@ -66,10 +66,10 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
queue.enqueue(createMessage(9L, (byte) 0));
// Register subscriber
- queue.registerSubscription(getSubscription(), null, null, "test", EnumSet.noneOf(Subscription.Option.class));
+ queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(Consumer.Option.class));
Thread.sleep(150);
- ArrayList<MessageInstance> msgs = getSubscription().getMessages();
+ ArrayList<MessageInstance> msgs = getConsumer().getMessages();
try
{
assertEquals(1L, msgs.get(0).getMessage().getMessageNumber());
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
new file mode 100644
index 0000000000..35508bb2c4
--- /dev/null
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
@@ -0,0 +1,445 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConsumerListTest extends QpidTestCase
+{
+ private QueueConsumerList _subList;
+ private QueueConsumer _sub1;
+ private QueueConsumer _sub2;
+ private QueueConsumer _sub3;
+ private QueueConsumerList.ConsumerNode _node;
+
+ protected void setUp()
+ {
+ _subList = new QueueConsumerList();
+
+ _sub1 = newMockConsumer();
+ _sub2 = newMockConsumer();
+ _sub3 = newMockConsumer();
+
+ _subList.add(_sub1);
+ _subList.add(_sub2);
+ _subList.add(_sub3);
+
+ _node = _subList.getHead();
+ }
+
+
+ private QueueConsumer newMockConsumer()
+ {
+ ConsumerTarget target = mock(ConsumerTarget.class);
+ when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class));
+ return new QueueConsumer(null,null,true,true,"sub",false,target);
+ }
+
+ /**
+ * Test that if the first (non-head) node in the list is deleted (but is still present),
+ * it is not returned when searching through the list for the next viable node, and the
+ * subsequent viable node is returned instead.
+ */
+ public void testFindNextSkipsFirstDeletedNode()
+ {
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub1).delete());
+
+ assertNotNull("Returned node should not be null", _node = _node.findNext());
+ assertEquals("Should have returned node for 2nd consumer", _sub2, _node.getConsumer());
+
+ assertNotNull("Returned node should not be null", _node = _node.findNext());
+ assertEquals("Should have returned node for 3rd consumer", _sub3, _node.getConsumer());
+ }
+
+ /**
+ * Test that if a central node in the list is deleted (but is still present),
+ * it is not returned when searching through the list for the next viable node,
+ * and the subsequent viable node is returned instead.
+ */
+ public void testFindNextSkipsCentralDeletedNode()
+ {
+ assertNotNull("Returned node should not be null", _node = _node.findNext());
+
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub2).delete());
+
+ assertNotNull("Returned node should not be null", _node = _node.findNext());
+ assertEquals("Should have returned node for 3rd consumer", _sub3, _node.getConsumer());
+ }
+
+ /**
+ * Test that if the last node in the list is deleted (but is still present),
+ * it is not returned when searching through the list for the next viable node,
+ * and null is returned instead.
+ */
+ public void testFindNextSkipsLastDeletedNode()
+ {
+ assertNotNull("Returned node should not be null", _node = _node.findNext());
+ assertEquals("Should have returned node for 1st consumer", _sub1, _node.getConsumer());
+
+ assertNotNull("Returned node should not be null", _node = _node.findNext());
+ assertEquals("Should have returned node for 2nd consumer", _sub2, _node.getConsumer());
+
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub3).delete());
+
+ assertNull("Returned node should be null", _node = _node.findNext());
+ }
+
+ /**
+ * Test that if multiple nodes in the list are deleted (but still present), they
+ * are not returned when searching through the list for the next viable node,
+ * and the subsequent viable node is returned instead.
+ */
+ public void testFindNextSkipsMultipleDeletedNode()
+ {
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub1).delete());
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub2).delete());
+
+ assertNotNull("Returned node should not be null", _node = _node.findNext());
+ assertEquals("Should have returned node for 3rd consumer", _sub3, _node.getConsumer());
+ }
+
+ /**
+ * Test that if a node in the list is marked 'deleted' it is still present in the list
+ * until actually removed. counter-test to verify above testing of getNext() method.
+ */
+ public void testDeletedNodeStillPresent()
+ {
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub1).delete());
+
+ assertNotNull("Node marked deleted should still be present", getNodeForConsumer(_subList, _sub1));
+ assertEquals("All 3 nodes are still expected to be present", 3, countNodes(_subList));
+ }
+
+ /**
+ * Traverses the list nodes in a non-mutating fashion, returning the first node which matches the given
+ * Consumer, or null if none is found.
+ */
+ private QueueConsumerList.ConsumerNode getNodeForConsumer(final QueueConsumerList list, final Consumer sub)
+ {
+ QueueConsumerList.ConsumerNode node = list.getHead();
+ while (node != null && node.getConsumer() != sub)
+ {
+ node = node.nextNode();
+ }
+
+ return node;
+ }
+
+ /**
+ * Counts the number of (non-head) nodes in the list.
+ */
+ private int countNodes(final QueueConsumerList list)
+ {
+ QueueConsumerList.ConsumerNode node = list.getHead();
+ int count;
+ for(count = -1; node != null; count++)
+ {
+ node = node.nextNode();
+ }
+
+ return count;
+ }
+
+ /**
+ * Tests that the head is returned as expected, and isn't the node for the first consumer.
+ */
+ public void testGetHead()
+ {
+ assertNotNull("List head should be non null", _node);
+ assertNotSame("Head should not be node for first consumer",
+ _node, getNodeForConsumer(_subList, _sub1));
+ }
+
+ /**
+ * Tests that the size is returned correctly in the face of additions and removals.
+ */
+ public void testGetSize()
+ {
+ QueueConsumerList subList = new QueueConsumerList();
+
+ assertEquals("Unexpected size result", 0, subList.size());
+
+ QueueConsumer sub1 = newMockConsumer();
+ QueueConsumer sub2 = newMockConsumer();
+ QueueConsumer sub3 = newMockConsumer();
+
+ subList.add(sub1);
+ assertEquals("Unexpected size result", 1, subList.size());
+
+ subList.add(sub2);
+ assertEquals("Unexpected size result", 2, subList.size());
+
+ subList.add(sub3);
+ assertEquals("Unexpected size result", 3, subList.size());
+
+ assertTrue("Removing consumer from list should have succeeded", subList.remove(sub1));
+ assertEquals("Unexpected size result", 2, subList.size());
+
+ assertTrue("Removing consumer from list should have succeeded", subList.remove(sub2));
+ assertEquals("Unexpected size result", 1, subList.size());
+
+ assertTrue("Removing consumer from list should have succeeded", subList.remove(sub3));
+ assertEquals("Unexpected size result", 0, subList.size());
+ }
+
+ /**
+ * Test that if the first (non-head) node in the list is removed it is no longer
+ * present in the node structure of the list at all.
+ */
+ public void testRemoveFirstNode()
+ {
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub1));
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub1));
+ assertNull("Should not have been a node present for the removed consumer",
+ getNodeForConsumer(_subList, _sub1));
+ assertEquals("Unexpected number of nodes", 2, countNodes(_subList));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub2));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub3));
+ }
+
+ /**
+ * Test that if a central node in the list is removed it is no longer
+ * present in the node structure of the list at all.
+ */
+ public void testRemoveCentralNode()
+ {
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub2));
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub2));
+ assertNull("Should not have been a node present for the removed consumer",
+ getNodeForConsumer(_subList, _sub2));
+ assertEquals("Unexpected number of nodes", 2, countNodes(_subList));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub1));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub3));
+ }
+
+ /**
+ * Test that if the consumer contained in the last node of the list is removed
+ * it is no longer present in the node structure of the list at all. However,
+ * as the last node in the structure can't actually be removed a dummy will instead
+ * be present.
+ */
+ public void testRemoveLastNode()
+ {
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub3));
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub3));
+ assertNull("Should not have been a node present for the removed consumer",
+ getNodeForConsumer(_subList, _sub3));
+
+ //We actually expect 3 nodes to remain this time, because the last node cant be removed for thread safety reasons,
+ //however a dummy final node can be used as substitute to allow removal of the consumer node.
+ assertEquals("Unexpected number of nodes", 2 + 1, countNodes(_subList));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub1));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub2));
+ }
+
+ /**
+ * Test that if the consumer not contained in the list is requested to be removed
+ * that the removal fails
+ */
+ public void testRemoveNonexistentNode()
+ {
+ QueueConsumer sub4 = newMockConsumer();
+ assertNull("Should not have been a node present for the consumer", getNodeForConsumer(_subList, sub4));
+ assertFalse("Removing consumer node should not have succeeded", _subList.remove(sub4));
+ assertEquals("Unexpected number of nodes", 3, countNodes(_subList));
+ }
+
+ /**
+ * Test that if a consumer node which occurs later in the main list than the marked node is
+ * removed from the list after the marked node is also removed, then the marker node doesn't
+ * serve to retain the subsequent nodes in the list structure (and thus memory) despite their
+ * removal.
+ */
+ public void testDeletedMarkedNodeDoesntLeakSubsequentlyDeletedNodes()
+ {
+ //get the nodes out the list for the 1st and 3rd consumers
+ QueueConsumerList.ConsumerNode sub1Node = getNodeForConsumer(_subList, _sub1);
+ assertNotNull("Should have been a node present for the consumer", sub1Node);
+ QueueConsumerList.ConsumerNode sub3Node = getNodeForConsumer(_subList, _sub3);
+ assertNotNull("Should have been a node present for the consumer", sub3Node);
+
+ //mark the first consumer node
+ assertTrue("should have succeeded in updating the marked node",
+ _subList.updateMarkedNode(_subList.getMarkedNode(), sub1Node));
+
+ //remove the 1st consumer from the list
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub1));
+ //verify the 1st consumer is no longer the marker node (replaced by a dummy), or in the main list structure
+ assertNotSame("Unexpected marker node", sub1Node, _subList.getMarkedNode());
+ assertNull("Should not have been a node present in the list structure for the marked-but-removed sub1 node",
+ getNodeForConsumer(_subList, _sub1));
+
+ //remove the 2nd consumer from the list
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub2));
+
+ //verify the marker node isn't leaking subsequently removed nodes, by ensuring the very next node
+ //in its list structure is now the 3rd consumer (since the 2nd was removed too)
+ assertEquals("Unexpected next node", sub3Node, _subList.getMarkedNode().nextNode());
+
+ //remove the 3rd and final/tail consumer
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub3));
+
+ //verify the marker node isn't leaking subsequently removed nodes, by ensuring the very next node
+ //in its list structure is now the dummy tail (since the 3rd consumer was removed, and a dummy
+ //tail was inserted) and NOT the 3rd sub node.
+ assertNotSame("Unexpected next node", sub3Node, _subList.getMarkedNode().nextNode());
+ assertTrue("Unexpected next node", _subList.getMarkedNode().nextNode().isDeleted());
+ assertNull("Next non-deleted node from the marker should now be the list end, i.e. null", _subList.getMarkedNode().findNext());
+ }
+
+ /**
+ * Test that the marked node 'findNext' behaviour is as expected after a consumer is added
+ * to the list following the tail consumer node being removed while it is the marked node.
+ * That is, that the new consumers node is returned by getMarkedNode().findNext().
+ */
+ public void testMarkedNodeFindsNewConsumerAfterRemovingTailWhilstMarked()
+ {
+ //get the node out the list for the 3rd consumer
+ QueueConsumerList.ConsumerNode sub3Node = getNodeForConsumer(_subList, _sub3);
+ assertNotNull("Should have been a node present for the consumer", sub3Node);
+
+ //mark the 3rd consumer node
+ assertTrue("should have succeeded in updating the marked node",
+ _subList.updateMarkedNode(_subList.getMarkedNode(), sub3Node));
+
+ //verify calling findNext on the marked node returns null, i.e. the end of the list has been reached
+ assertEquals("Unexpected node after marked node", null, _subList.getMarkedNode().findNext());
+
+ //remove the 3rd(marked) consumer from the list
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub3));
+
+ //add a new 4th consumer to the list
+ QueueConsumer sub4 = newMockConsumer();
+ _subList.add(sub4);
+
+ //get the node out the list for the 4th consumer
+ QueueConsumerList.ConsumerNode sub4Node = getNodeForConsumer(_subList, sub4);
+ assertNotNull("Should have been a node present for the consumer", sub4Node);
+
+ //verify the marked node (which is now a dummy substitute for the 3rd consumer) returns
+ //the 4th consumers node as the next non-deleted node.
+ assertEquals("Unexpected next node", sub4Node, _subList.getMarkedNode().findNext());
+ }
+
+ /**
+ * Test that setting the marked node to null doesn't cause problems during remove operations
+ */
+ public void testRemoveWithNullMarkedNode()
+ {
+ //set the marker to null
+ assertTrue("should have succeeded in updating the marked node",
+ _subList.updateMarkedNode(_subList.getMarkedNode(), null));
+
+ //remove the 1st consumer from the main list
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub1));
+
+ //verify the 1st consumer is no longer in the main list structure
+ assertNull("Should not have been a node present in the main list structure for sub1",
+ getNodeForConsumer(_subList, _sub1));
+ assertEquals("Unexpected number of nodes", 2, countNodes(_subList));
+ }
+
+ /**
+ * Tests that after the first (non-head) node of the list is marked deleted but has not
+ * yet been removed, the iterator still skips it.
+ */
+ public void testIteratorSkipsFirstDeletedNode()
+ {
+ //'delete' but don't remove the node for the 1st consumer
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub1).delete());
+ assertNotNull("Should still have been a node present for the deleted consumer",
+ getNodeForConsumer(_subList, _sub1));
+
+ QueueConsumerList.ConsumerNodeIterator iter = _subList.iterator();
+
+ //verify the iterator returns the 2nd consumers node
+ assertTrue("Iterator should have been able to advance", iter.advance());
+ assertEquals("Iterator returned unexpected ConsumerNode", _sub2, iter.getNode().getConsumer());
+
+ //verify the iterator returns the 3rd consumers node and not the 2nd.
+ assertTrue("Iterator should have been able to advance", iter.advance());
+ assertEquals("Iterator returned unexpected ConsumerNode", _sub3, iter.getNode().getConsumer());
+ }
+
+ /**
+ * Tests that after a central node of the list is marked deleted but has not yet been removed,
+ * the iterator still skips it.
+ */
+ public void testIteratorSkipsCentralDeletedNode()
+ {
+ //'delete' but don't remove the node for the 2nd consumer
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub2).delete());
+ assertNotNull("Should still have been a node present for the deleted consumer",
+ getNodeForConsumer(_subList, _sub2));
+
+ QueueConsumerList.ConsumerNodeIterator iter = _subList.iterator();
+
+ //verify the iterator returns the 1st consumers node
+ assertTrue("Iterator should have been able to advance", iter.advance());
+ assertEquals("Iterator returned unexpected ConsumerNode", _sub1, iter.getNode().getConsumer());
+
+ //verify the iterator returns the 3rd consumers node and not the 2nd.
+ assertTrue("Iterator should have been able to advance", iter.advance());
+ assertEquals("Iterator returned unexpected ConsumerNode", _sub3, iter.getNode().getConsumer());
+ }
+
+ /**
+ * Tests that after the last node of the list is marked deleted but has not yet been removed,
+ * the iterator still skips it.
+ */
+ public void testIteratorSkipsDeletedFinalNode()
+ {
+ //'delete' but don't remove the node for the 3rd consumer
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub3).delete());
+ assertNotNull("Should still have been a node present for the deleted 3rd consumer",
+ getNodeForConsumer(_subList, _sub3));
+
+ QueueConsumerList.ConsumerNodeIterator iter = _subList.iterator();
+
+ //verify the iterator returns the 1st consumers node
+ assertTrue("Iterator should have been able to advance", iter.advance());
+ assertEquals("Iterator returned unexpected ConsumerNode", _sub1, iter.getNode().getConsumer());
+
+ //verify the iterator returns the 2nd consumers node
+ assertTrue("Iterator should have been able to advance", iter.advance());
+ assertEquals("Iterator returned unexpected ConsumerNode", _sub2, iter.getNode().getConsumer());
+
+ //verify the iterator can no longer advance and does not return a consumer node
+ assertFalse("Iterator should not have been able to advance", iter.advance());
+ assertEquals("Iterator returned unexpected ConsumerNode", null, iter.getNode());
+ }
+}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 623118de02..36fd92004a 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -29,8 +29,8 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -207,15 +207,15 @@ public class MockAMQQueue implements AMQQueue
}
@Override
- public Subscription registerSubscription(final SubscriptionTarget target,
- final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
- final String consumerName,
- final EnumSet<Subscription.Option> options) throws AMQException
+ public Consumer addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ final EnumSet<Consumer.Option> options) throws AMQException
{
- return new QueueSubscription(filters, messageClass, options.contains(Subscription.Option.ACQUIRES),
- options.contains(Subscription.Option.SEES_REQUEUES), consumerName,
- options.contains(Subscription.Option.TRANSIENT), target );
+ return new QueueConsumer(filters, messageClass, options.contains(Consumer.Option.ACQUIRES),
+ options.contains(Consumer.Option.SEES_REQUEUES), consumerName,
+ options.contains(Consumer.Option.TRANSIENT), target );
}
public String getName()
@@ -224,22 +224,18 @@ public class MockAMQQueue implements AMQQueue
}
- public void unregisterSubscription(Subscription subscription) throws AMQException
- {
-
- }
- public Collection<Subscription> getConsumers()
+ public Collection<Consumer> getConsumers()
{
return Collections.emptyList();
}
- public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
}
- public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
}
@@ -254,7 +250,7 @@ public class MockAMQQueue implements AMQQueue
return 0;
}
- public boolean hasExclusiveSubscriber()
+ public boolean hasExclusiveConsumer()
{
return false;
}
@@ -318,11 +314,11 @@ public class MockAMQQueue implements AMQQueue
{
}
- public void dequeue(QueueEntry entry, Subscription sub)
+ public void dequeue(QueueEntry entry, Consumer sub)
{
}
- public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException
+ public boolean resend(QueueEntry entry, Consumer consumer) throws AMQException
{
return false;
}
@@ -431,12 +427,12 @@ public class MockAMQQueue implements AMQQueue
return null;
}
- public void flushSubscription(Subscription sub) throws AMQException
+ public void flushConsumer(Consumer sub) throws AMQException
{
}
- public void deliverAsync(Subscription sub)
+ public void deliverAsync(Consumer sub)
{
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index e74bc992a2..b3a124a6bd 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -26,7 +26,7 @@ import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
@@ -41,7 +41,7 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public boolean acquire(Subscription sub)
+ public boolean acquire(Consumer sub)
{
return false;
}
@@ -52,12 +52,12 @@ public class MockQueueEntry implements QueueEntry
return 0;
}
- public boolean acquiredBySubscription()
+ public boolean acquiredByConsumer()
{
return false;
}
- public boolean isAcquiredBy(Subscription subscription)
+ public boolean isAcquiredBy(Consumer consumer)
{
return false;
}
@@ -87,7 +87,7 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public Subscription getDeliveredSubscription()
+ public Consumer getDeliveredConsumer()
{
return null;
}
@@ -125,7 +125,7 @@ public class MockQueueEntry implements QueueEntry
}
- public boolean isRejectedBy(Subscription subscription)
+ public boolean isRejectedBy(Consumer consumer)
{
return false;
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index c2099ec186..0b2c6c900c 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -21,10 +21,11 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageInstance.EntryState;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import java.lang.reflect.Field;
@@ -112,17 +113,17 @@ public abstract class QueueEntryImplTestBase extends TestCase
*/
private void acquire()
{
- _queueEntry.acquire(newMockSubscription());
+ _queueEntry.acquire(newMockConsumer());
assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
_queueEntry.isAcquired());
}
- private Subscription newMockSubscription()
+ private Consumer newMockConsumer()
{
- final Subscription subscription = mock(Subscription.class);
- when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription));
- when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
- return subscription;
+ final Consumer consumer = mock(Consumer.class);
+ when(consumer.getOwningState()).thenReturn(new MessageInstance.ConsumerAcquiredState(consumer));
+ when(consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement());
+ return consumer;
}
/**
@@ -147,34 +148,34 @@ public abstract class QueueEntryImplTestBase extends TestCase
}
/**
- * Tests rejecting a queue entry records the Subscription ID
- * for later verification by isRejectedBy(subscriptionId).
+ * Tests rejecting a queue entry records the Consumer ID
+ * for later verification by isRejectedBy(consumerId).
*/
public void testRejectAndRejectedBy()
{
- Subscription sub = newMockSubscription();
+ Consumer sub = newMockConsumer();
- assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
- assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired());
+ assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
+ assertFalse("Queue entry should not yet have been acquired by a consumer", _queueEntry.isAcquired());
- //acquire, reject, and release the message using the subscription
+ //acquire, reject, and release the message using the consumer
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub));
_queueEntry.reject();
_queueEntry.release();
//verify the rejection is recorded
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
+ assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
- //repeat rejection using a second subscription
- Subscription sub2 = newMockSubscription();
+ //repeat rejection using a second consumer
+ Consumer sub2 = newMockConsumer();
- assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2));
+ assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
_queueEntry.reject();
- //verify it still records being rejected by both subscriptions
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2));
+ //verify it still records being rejected by both consumers
+ assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
+ assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
}
/**
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 51ae822b2e..542f6ba0d1 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -45,9 +45,9 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
-import org.apache.qpid.server.subscription.MockSubscription;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -69,8 +69,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
private String _owner = "owner";
private String _routingKey = "routing key";
private DirectExchange _exchange;
- private MockSubscription _subscriptionTarget = new MockSubscription();
- private QueueSubscription _subscription;
+ private MockConsumer _consumerTarget = new MockConsumer();
+ private QueueConsumer _consumer;
private Map<String,Object> _arguments = null;
@Override
@@ -162,13 +162,13 @@ public class SimpleAMQQueueTest extends QpidTestCase
}
- public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException
+ public void testRegisterConsumerThenEnqueueMessage() throws AMQException
{
ServerMessage messageA = createMessage(new Long(24));
- // Check adding a subscription adds it to the queue
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ // Check adding a consumer adds it to the queue
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
@@ -183,49 +183,49 @@ public class SimpleAMQQueueTest extends QpidTestCase
catch(InterruptedException e)
{
}
- assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull(_subscription.getQueueContext().getReleasedEntry());
+ assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
+ assertNull(_consumer.getQueueContext().getReleasedEntry());
- // Check removing the subscription removes it's information from the queue
- _subscription.close();
- assertTrue("Subscription still had queue", _subscriptionTarget.isClosed());
+ // Check removing the consumer removes it's information from the queue
+ _consumer.close();
+ assertTrue("Consumer still had queue", _consumerTarget.isClosed());
assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
assertFalse("Queue still has active consumer",
1 == _queue.getActiveConsumerCount());
ServerMessage messageB = createMessage(new Long (25));
_queue.enqueue(messageB);
- assertNull(_subscription.getQueueContext());
+ assertNull(_consumer.getQueueContext());
}
- public void testEnqueueMessageThenRegisterSubscription() throws AMQException, InterruptedException
+ public void testEnqueueMessageThenRegisterConsumer() throws AMQException, InterruptedException
{
ServerMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
Thread.sleep(150);
- assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+ assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after an enqueue",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
* Tests enqueuing two messages.
*/
- public void testEnqueueTwoMessagesThenRegisterSubscription() throws Exception
+ public void testEnqueueTwoMessagesThenRegisterConsumer() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
_queue.enqueue(messageA);
_queue.enqueue(messageB);
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
Thread.sleep(150);
- assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+ assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after enqueues",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
@@ -240,9 +240,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
ServerMessage messageC = createMessage(new Long(26));
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -261,7 +261,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 3,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -272,12 +274,14 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 4, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 4,
+ _consumerTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
@@ -289,9 +293,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
ServerMessage messageA = createMessage(new Long(24));
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -313,7 +317,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
int subFlushWaitTime = 150;
Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 1, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 1,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
/* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */
@@ -323,10 +329,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
- assertEquals("Total number of messages sent should not have changed", 1, _subscriptionTarget.getMessages().size());
+ assertEquals("Total number of messages sent should not have changed",
+ 1,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
@@ -343,9 +351,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
ServerMessage messageB = createMessage(new Long(25));
ServerMessage messageC = createMessage(new Long(26));
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -364,7 +372,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 3,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -376,35 +386,37 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 5, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 5,
+ _consumerTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
- * Tests that a release requeues an entry for a queue with multiple subscriptions. Verifies that a
+ * Tests that a release requeues an entry for a queue with multiple consumers. Verifies that a
* requeue resends a message to a <i>single</i> subscriber.
*/
- public void testReleaseForQueueWithMultipleSubscriptions() throws Exception
+ public void testReleaseForQueueWithMultipleConsumers() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
- MockSubscription target1 = new MockSubscription();
- MockSubscription target2 = new MockSubscription();
+ MockConsumer target1 = new MockConsumer();
+ MockConsumer target2 = new MockConsumer();
- QueueSubscription subscription1 = _queue.registerSubscription(target1, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES));
+ QueueConsumer consumer1 = _queue.addConsumer(target1, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
- QueueSubscription subscription2 = _queue.registerSubscription(target2, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES));
+ QueueConsumer consumer2 = _queue.addConsumer(target2, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
@@ -433,22 +445,22 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to both subscriptions after release",
+ assertEquals("Unexpected total number of messages sent to both consumers after release",
3,
target1.getMessages().size() + target2.getMessages().size());
assertNull("releasedEntry should be cleared after requeue processed",
- subscription1.getQueueContext().getReleasedEntry());
+ consumer1.getQueueContext().getReleasedEntry());
assertNull("releasedEntry should be cleared after requeue processed",
- subscription2.getQueueContext().getReleasedEntry());
+ consumer2.getQueueContext().getReleasedEntry());
}
public void testExclusiveConsumer() throws AMQException
{
ServerMessage messageA = createMessage(new Long(24));
- // Check adding an exclusive subscription adds it to the queue
+ // Check adding an exclusive consumer adds it to the queue
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.EXCLUSIVE));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.EXCLUSIVE));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
@@ -464,16 +476,16 @@ public class SimpleAMQQueueTest extends QpidTestCase
catch (InterruptedException e)
{
}
- assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+ assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
- MockSubscription subB = new MockSubscription();
+ MockConsumer subB = new MockConsumer();
Exception ex = null;
try
{
- _queue.registerSubscription(subB, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _queue.addConsumer(subB, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
}
catch (AMQException e)
@@ -483,16 +495,16 @@ public class SimpleAMQQueueTest extends QpidTestCase
assertNotNull(ex);
// Check we cannot add an exclusive subscriber to a queue with an
- // existing subscription
- _subscription.close();
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ // existing consumer
+ _consumer.close();
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
try
{
- _subscription = _queue.registerSubscription(subB, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.EXCLUSIVE));
+ _consumer = _queue.addConsumer(subB, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.EXCLUSIVE));
}
catch (AMQException e)
@@ -509,12 +521,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.setDeleteOnNoConsumers(true);
ServerMessage message = createMessage(new Long(25));
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
_queue.enqueue(message);
- _subscription.close();
- assertTrue("Queue was not deleted when subscription was removed",
+ _consumer.close();
+ assertTrue("Queue was not deleted when consumer was removed",
_queue.isDeleted());
}
@@ -523,13 +535,13 @@ public class SimpleAMQQueueTest extends QpidTestCase
Long id = new Long(26);
ServerMessage message = createMessage(id);
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
_queue.enqueue(message);
- QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
+ QueueEntry entry = _consumer.getQueueContext().getLastSeenEntry();
entry.setRedelivered();
- _subscription.resend(entry);
+ _consumer.resend(entry);
}
@@ -656,19 +668,19 @@ public class SimpleAMQQueueTest extends QpidTestCase
/**
* processQueue() is used when asynchronously delivering messages to
- * subscriptions which could not be delivered immediately during the
+ * consumers which could not be delivered immediately during the
* enqueue() operation.
*
* A defect within the method would mean that delivery of these messages may
* not occur should the Runner stop before all messages have been processed.
* Such a defect was discovered when Selectors were used such that one and
- * only one subscription can/will accept any given messages, but multiple
- * subscriptions are present, and one of the earlier subscriptions receives
+ * only one consumer can/will accept any given messages, but multiple
+ * consumers are present, and one of the earlier consumers receives
* more messages than the others.
*
* This test is to validate that the processQueue() method is able to
* correctly deliver all of the messages present for asynchronous delivery
- * to subscriptions in such a scenario.
+ * to consumers in such a scenario.
*/
public void testProcessQueueWithUniqueSelectors() throws Exception
{
@@ -677,10 +689,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
false, false, _virtualHost, factory, null)
{
@Override
- public void deliverAsync(QueueSubscription sub)
+ public void deliverAsync(QueueConsumer sub)
{
// do nothing, i.e prevent deliveries by the SubFlushRunner
- // when registering the new subscriptions
+ // when registering the new consumers
}
};
@@ -696,28 +708,28 @@ public class SimpleAMQQueueTest extends QpidTestCase
QueueEntry msg4 = list.add(createMessage(4L));
QueueEntry msg5 = list.add(createMessage(5L));
- // Create lists of the entries each subscription should be interested
- // in.Bias over 50% of the messages to the first subscription so that
- // the later subscriptions reject them and report being done before
- // the first subscription as the processQueue method proceeds.
+ // Create lists of the entries each consumer should be interested
+ // in.Bias over 50% of the messages to the first consumer so that
+ // the later consumers reject them and report being done before
+ // the first consumer as the processQueue method proceeds.
List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3);
List<String> msgListSub2 = createEntriesList(msg4);
List<String> msgListSub3 = createEntriesList(msg5);
- MockSubscription sub1 = new MockSubscription(msgListSub1);
- MockSubscription sub2 = new MockSubscription(msgListSub2);
- MockSubscription sub3 = new MockSubscription(msgListSub3);
+ MockConsumer sub1 = new MockConsumer(msgListSub1);
+ MockConsumer sub2 = new MockConsumer(msgListSub2);
+ MockConsumer sub3 = new MockConsumer(msgListSub3);
- // register the subscriptions
- testQueue.registerSubscription(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
- testQueue.registerSubscription(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
- testQueue.registerSubscription(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
+ // register the consumers
+ testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+ testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+ testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
//check that no messages have been delivered to the
- //subscriptions during registration
+ //consumers during registration
assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
@@ -904,7 +916,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
false, "testOwner", false, false, _virtualHost, null)
{
@Override
- public void deliverAsync(QueueSubscription sub)
+ public void deliverAsync(QueueConsumer sub)
{
// do nothing
}
@@ -919,8 +931,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
// latch to wait for message receipt
final CountDownLatch latch = new CountDownLatch(messageNumber -1);
- // create a subscription
- MockSubscription subscription = new MockSubscription()
+ // create a consumer
+ MockConsumer consumer = new MockConsumer()
{
/**
* Send a message and decrement latch
@@ -937,7 +949,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
try
{
// subscribe
- testQueue.registerSubscription(subscription, null, entries.get(0).getMessage().getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+ testQueue.addConsumer(consumer,
+ null,
+ entries.get(0).getMessage().getClass(),
+ "test",
+ EnumSet.noneOf(Consumer.Option.class));
// process queue
testQueue.processQueue(new QueueRunner(testQueue)
@@ -962,11 +978,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
Thread.currentThread().interrupt();
}
List<MessageInstance> expected = Arrays.asList((MessageInstance)entries.get(0), entries.get(2), entries.get(3));
- verifyReceivedMessages(expected, subscription.getMessages());
+ verifyReceivedMessages(expected, consumer.getMessages());
}
/**
- * Tests that entry in dequeued state are not enqueued and not delivered to subscription
+ * Tests that entry in dequeued state are not enqueued and not delivered to consumer
*/
public void testEnqueueDequeuedEntry()
{
@@ -1002,7 +1018,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
}
@Override
- public boolean acquire(Subscription sub)
+ public boolean acquire(Consumer sub)
{
if(message.getMessageNumber() % 2 == 0)
{
@@ -1018,24 +1034,28 @@ public class SimpleAMQQueueTest extends QpidTestCase
};
}
}, null);
- // create a subscription
- MockSubscription subscription = new MockSubscription();
+ // create a consumer
+ MockConsumer consumer = new MockConsumer();
- // register subscription
+ // register consumer
try
{
- queue.registerSubscription(subscription, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+ queue.addConsumer(consumer,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.noneOf(Consumer.Option.class));
}
catch (AMQException e)
{
- fail("Failure to register subscription:" + e.getMessage());
+ fail("Failure to register consumer:" + e.getMessage());
}
// put test messages into a queue
putGivenNumberOfMessages(queue, 4);
// assert received messages
- List<MessageInstance> messages = subscription.getMessages();
+ List<MessageInstance> messages = consumer.getMessages();
assertEquals("Only 2 messages should be returned", 2, messages.size());
assertEquals("ID of first message should be 1", 1l,
(messages.get(0).getMessage()).getMessageNumber());
@@ -1048,52 +1068,60 @@ public class SimpleAMQQueueTest extends QpidTestCase
final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false,
"testOwner", false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null);
- //verify adding an active subscription increases the count
- final MockSubscription subscription1 = new MockSubscription();
- subscription1.setActive(true);
- subscription1.setState(SubscriptionTarget.State.ACTIVE);
+ //verify adding an active consumer increases the count
+ final MockConsumer consumer1 = new MockConsumer();
+ consumer1.setActive(true);
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- queue.registerSubscription(subscription1, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+ queue.addConsumer(consumer1,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.noneOf(Consumer.Option.class));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify adding an inactive subscription doesn't increase the count
- final MockSubscription subscription2 = new MockSubscription();
- subscription2.setActive(false);
- subscription2.setState(SubscriptionTarget.State.SUSPENDED);
+ //verify adding an inactive consumer doesn't increase the count
+ final MockConsumer consumer2 = new MockConsumer();
+ consumer2.setActive(false);
+ consumer2.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- queue.registerSubscription(subscription2, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+ queue.addConsumer(consumer2,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.noneOf(Consumer.Option.class));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify behaviour in face of expected state changes:
- //verify a subscription going suspended->active increases the count
- subscription2.setState(SubscriptionTarget.State.ACTIVE);
+ //verify a consumer going suspended->active increases the count
+ consumer2.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
- //verify a subscription going active->suspended decreases the count
- subscription2.setState(SubscriptionTarget.State.SUSPENDED);
+ //verify a consumer going active->suspended decreases the count
+ consumer2.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify a subscription going suspended->closed doesn't change the count
- subscription2.setState(SubscriptionTarget.State.CLOSED);
+ //verify a consumer going suspended->closed doesn't change the count
+ consumer2.setState(ConsumerTarget.State.CLOSED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify a subscription going active->active doesn't change the count
- subscription1.setState(SubscriptionTarget.State.ACTIVE);
+ //verify a consumer going active->active doesn't change the count
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- subscription1.setState(SubscriptionTarget.State.SUSPENDED);
+ consumer1.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- //verify a subscription going suspended->suspended doesn't change the count
- subscription1.setState(SubscriptionTarget.State.SUSPENDED);
+ //verify a consumer going suspended->suspended doesn't change the count
+ consumer1.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- subscription1.setState(SubscriptionTarget.State.ACTIVE);
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify a subscription going active->closed decreases the count
- subscription1.setState(SubscriptionTarget.State.CLOSED);
+ //verify a consumer going active->closed decreases the count
+ consumer1.setState(ConsumerTarget.State.CLOSED);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
}
@@ -1248,9 +1276,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
return _queue;
}
- public MockSubscription getSubscription()
+ public MockConsumer getConsumer()
{
- return _subscriptionTarget;
+ return _consumerTarget;
}
public Map<String,Object> getArguments()
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SubscriptionListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SubscriptionListTest.java
deleted file mode 100644
index 65ae53a3a7..0000000000
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SubscriptionListTest.java
+++ /dev/null
@@ -1,442 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class SubscriptionListTest extends QpidTestCase
-{
- private SubscriptionList _subList;
- private QueueSubscription _sub1;
- private QueueSubscription _sub2;
- private QueueSubscription _sub3;
- private SubscriptionList.SubscriptionNode _node;
-
- protected void setUp()
- {
- _subList = new SubscriptionList();
-
- _sub1 = newMockSubscription();
- _sub2 = newMockSubscription();
- _sub3 = newMockSubscription();
-
- _subList.add(_sub1);
- _subList.add(_sub2);
- _subList.add(_sub3);
-
- _node = _subList.getHead();
- }
-
-
- private QueueSubscription newMockSubscription()
- {
- SubscriptionTarget target = mock(SubscriptionTarget.class);
- when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class));
- return new QueueSubscription(null,null,true,true,"sub",false,target);
- }
-
- /**
- * Test that if the first (non-head) node in the list is deleted (but is still present),
- * it is not returned when searching through the list for the next viable node, and the
- * subsequent viable node is returned instead.
- */
- public void testFindNextSkipsFirstDeletedNode()
- {
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub1).delete());
-
- assertNotNull("Returned node should not be null", _node = _node.findNext());
- assertEquals("Should have returned node for 2nd subscription", _sub2, _node.getSubscription());
-
- assertNotNull("Returned node should not be null", _node = _node.findNext());
- assertEquals("Should have returned node for 3rd subscription", _sub3, _node.getSubscription());
- }
-
- /**
- * Test that if a central node in the list is deleted (but is still present),
- * it is not returned when searching through the list for the next viable node,
- * and the subsequent viable node is returned instead.
- */
- public void testFindNextSkipsCentralDeletedNode()
- {
- assertNotNull("Returned node should not be null", _node = _node.findNext());
-
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub2).delete());
-
- assertNotNull("Returned node should not be null", _node = _node.findNext());
- assertEquals("Should have returned node for 3rd subscription", _sub3, _node.getSubscription());
- }
-
- /**
- * Test that if the last node in the list is deleted (but is still present),
- * it is not returned when searching through the list for the next viable node,
- * and null is returned instead.
- */
- public void testFindNextSkipsLastDeletedNode()
- {
- assertNotNull("Returned node should not be null", _node = _node.findNext());
- assertEquals("Should have returned node for 1st subscription", _sub1, _node.getSubscription());
-
- assertNotNull("Returned node should not be null", _node = _node.findNext());
- assertEquals("Should have returned node for 2nd subscription", _sub2, _node.getSubscription());
-
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub3).delete());
-
- assertNull("Returned node should be null", _node = _node.findNext());
- }
-
- /**
- * Test that if multiple nodes in the list are deleted (but still present), they
- * are not returned when searching through the list for the next viable node,
- * and the subsequent viable node is returned instead.
- */
- public void testFindNextSkipsMultipleDeletedNode()
- {
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub1).delete());
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub2).delete());
-
- assertNotNull("Returned node should not be null", _node = _node.findNext());
- assertEquals("Should have returned node for 3rd subscription", _sub3, _node.getSubscription());
- }
-
- /**
- * Test that if a node in the list is marked 'deleted' it is still present in the list
- * until actually removed. counter-test to verify above testing of getNext() method.
- */
- public void testDeletedNodeStillPresent()
- {
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub1).delete());
-
- assertNotNull("Node marked deleted should still be present", getNodeForSubscription(_subList, _sub1));
- assertEquals("All 3 nodes are still expected to be present", 3, countNodes(_subList));
- }
-
- /**
- * Traverses the list nodes in a non-mutating fashion, returning the first node which matches the given
- * Subscription, or null if none is found.
- */
- private SubscriptionList.SubscriptionNode getNodeForSubscription(final SubscriptionList list, final Subscription sub)
- {
- SubscriptionList.SubscriptionNode node = list.getHead();
- while (node != null && node.getSubscription() != sub)
- {
- node = node.nextNode();
- }
-
- return node;
- }
-
- /**
- * Counts the number of (non-head) nodes in the list.
- */
- private int countNodes(final SubscriptionList list)
- {
- SubscriptionList.SubscriptionNode node = list.getHead();
- int count;
- for(count = -1; node != null; count++)
- {
- node = node.nextNode();
- }
-
- return count;
- }
-
- /**
- * Tests that the head is returned as expected, and isn't the node for the first subscription.
- */
- public void testGetHead()
- {
- assertNotNull("List head should be non null", _node);
- assertNotSame("Head should not be node for first subscription",
- _node, getNodeForSubscription(_subList, _sub1));
- }
-
- /**
- * Tests that the size is returned correctly in the face of additions and removals.
- */
- public void testGetSize()
- {
- SubscriptionList subList = new SubscriptionList();
-
- assertEquals("Unexpected size result", 0, subList.size());
-
- QueueSubscription sub1 = newMockSubscription();
- QueueSubscription sub2 = newMockSubscription();
- QueueSubscription sub3 = newMockSubscription();
-
- subList.add(sub1);
- assertEquals("Unexpected size result", 1, subList.size());
-
- subList.add(sub2);
- assertEquals("Unexpected size result", 2, subList.size());
-
- subList.add(sub3);
- assertEquals("Unexpected size result", 3, subList.size());
-
- assertTrue("Removing subscription from list should have succeeded", subList.remove(sub1));
- assertEquals("Unexpected size result", 2, subList.size());
-
- assertTrue("Removing subscription from list should have succeeded", subList.remove(sub2));
- assertEquals("Unexpected size result", 1, subList.size());
-
- assertTrue("Removing subscription from list should have succeeded", subList.remove(sub3));
- assertEquals("Unexpected size result", 0, subList.size());
- }
-
- /**
- * Test that if the first (non-head) node in the list is removed it is no longer
- * present in the node structure of the list at all.
- */
- public void testRemoveFirstNode()
- {
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub1));
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub1));
- assertNull("Should not have been a node present for the removed subscription", getNodeForSubscription(_subList, _sub1));
- assertEquals("Unexpected number of nodes", 2, countNodes(_subList));
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub2));
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub3));
- }
-
- /**
- * Test that if a central node in the list is removed it is no longer
- * present in the node structure of the list at all.
- */
- public void testRemoveCentralNode()
- {
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub2));
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub2));
- assertNull("Should not have been a node present for the removed subscription", getNodeForSubscription(_subList, _sub2));
- assertEquals("Unexpected number of nodes", 2, countNodes(_subList));
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub1));
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub3));
- }
-
- /**
- * Test that if the subscription contained in the last node of the list is removed
- * it is no longer present in the node structure of the list at all. However,
- * as the last node in the structure can't actually be removed a dummy will instead
- * be present.
- */
- public void testRemoveLastNode()
- {
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub3));
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3));
- assertNull("Should not have been a node present for the removed subscription", getNodeForSubscription(_subList, _sub3));
-
- //We actually expect 3 nodes to remain this time, because the last node cant be removed for thread safety reasons,
- //however a dummy final node can be used as substitute to allow removal of the subscription node.
- assertEquals("Unexpected number of nodes", 2 + 1, countNodes(_subList));
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub1));
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub2));
- }
-
- /**
- * Test that if the subscription not contained in the list is requested to be removed
- * that the removal fails
- */
- public void testRemoveNonexistentNode()
- {
- QueueSubscription sub4 = newMockSubscription();
- assertNull("Should not have been a node present for the subscription", getNodeForSubscription(_subList, sub4));
- assertFalse("Removing subscription node should not have succeeded", _subList.remove(sub4));
- assertEquals("Unexpected number of nodes", 3, countNodes(_subList));
- }
-
- /**
- * Test that if a subscription node which occurs later in the main list than the marked node is
- * removed from the list after the marked node is also removed, then the marker node doesn't
- * serve to retain the subsequent nodes in the list structure (and thus memory) despite their
- * removal.
- */
- public void testDeletedMarkedNodeDoesntLeakSubsequentlyDeletedNodes()
- {
- //get the nodes out the list for the 1st and 3rd subscriptions
- SubscriptionList.SubscriptionNode sub1Node = getNodeForSubscription(_subList, _sub1);
- assertNotNull("Should have been a node present for the subscription", sub1Node);
- SubscriptionList.SubscriptionNode sub3Node = getNodeForSubscription(_subList, _sub3);
- assertNotNull("Should have been a node present for the subscription", sub3Node);
-
- //mark the first subscription node
- assertTrue("should have succeeded in updating the marked node",
- _subList.updateMarkedNode(_subList.getMarkedNode(), sub1Node));
-
- //remove the 1st subscription from the list
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub1));
- //verify the 1st subscription is no longer the marker node (replaced by a dummy), or in the main list structure
- assertNotSame("Unexpected marker node", sub1Node, _subList.getMarkedNode());
- assertNull("Should not have been a node present in the list structure for the marked-but-removed sub1 node",
- getNodeForSubscription(_subList, _sub1));
-
- //remove the 2nd subscription from the list
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub2));
-
- //verify the marker node isn't leaking subsequently removed nodes, by ensuring the very next node
- //in its list structure is now the 3rd subscription (since the 2nd was removed too)
- assertEquals("Unexpected next node", sub3Node, _subList.getMarkedNode().nextNode());
-
- //remove the 3rd and final/tail subscription
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3));
-
- //verify the marker node isn't leaking subsequently removed nodes, by ensuring the very next node
- //in its list structure is now the dummy tail (since the 3rd subscription was removed, and a dummy
- //tail was inserted) and NOT the 3rd sub node.
- assertNotSame("Unexpected next node", sub3Node, _subList.getMarkedNode().nextNode());
- assertTrue("Unexpected next node", _subList.getMarkedNode().nextNode().isDeleted());
- assertNull("Next non-deleted node from the marker should now be the list end, i.e. null", _subList.getMarkedNode().findNext());
- }
-
- /**
- * Test that the marked node 'findNext' behaviour is as expected after a subscription is added
- * to the list following the tail subscription node being removed while it is the marked node.
- * That is, that the new subscriptions node is returned by getMarkedNode().findNext().
- */
- public void testMarkedNodeFindsNewSubscriptionAfterRemovingTailWhilstMarked()
- {
- //get the node out the list for the 3rd subscription
- SubscriptionList.SubscriptionNode sub3Node = getNodeForSubscription(_subList, _sub3);
- assertNotNull("Should have been a node present for the subscription", sub3Node);
-
- //mark the 3rd subscription node
- assertTrue("should have succeeded in updating the marked node",
- _subList.updateMarkedNode(_subList.getMarkedNode(), sub3Node));
-
- //verify calling findNext on the marked node returns null, i.e. the end of the list has been reached
- assertEquals("Unexpected node after marked node", null, _subList.getMarkedNode().findNext());
-
- //remove the 3rd(marked) subscription from the list
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3));
-
- //add a new 4th subscription to the list
- QueueSubscription sub4 = newMockSubscription();
- _subList.add(sub4);
-
- //get the node out the list for the 4th subscription
- SubscriptionList.SubscriptionNode sub4Node = getNodeForSubscription(_subList, sub4);
- assertNotNull("Should have been a node present for the subscription", sub4Node);
-
- //verify the marked node (which is now a dummy substitute for the 3rd subscription) returns
- //the 4th subscriptions node as the next non-deleted node.
- assertEquals("Unexpected next node", sub4Node, _subList.getMarkedNode().findNext());
- }
-
- /**
- * Test that setting the marked node to null doesn't cause problems during remove operations
- */
- public void testRemoveWithNullMarkedNode()
- {
- //set the marker to null
- assertTrue("should have succeeded in updating the marked node",
- _subList.updateMarkedNode(_subList.getMarkedNode(), null));
-
- //remove the 1st subscription from the main list
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub1));
-
- //verify the 1st subscription is no longer in the main list structure
- assertNull("Should not have been a node present in the main list structure for sub1",
- getNodeForSubscription(_subList, _sub1));
- assertEquals("Unexpected number of nodes", 2, countNodes(_subList));
- }
-
- /**
- * Tests that after the first (non-head) node of the list is marked deleted but has not
- * yet been removed, the iterator still skips it.
- */
- public void testIteratorSkipsFirstDeletedNode()
- {
- //'delete' but don't remove the node for the 1st subscription
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub1).delete());
- assertNotNull("Should still have been a node present for the deleted subscription",
- getNodeForSubscription(_subList, _sub1));
-
- SubscriptionList.SubscriptionNodeIterator iter = _subList.iterator();
-
- //verify the iterator returns the 2nd subscriptions node
- assertTrue("Iterator should have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", _sub2, iter.getNode().getSubscription());
-
- //verify the iterator returns the 3rd subscriptions node and not the 2nd.
- assertTrue("Iterator should have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", _sub3, iter.getNode().getSubscription());
- }
-
- /**
- * Tests that after a central node of the list is marked deleted but has not yet been removed,
- * the iterator still skips it.
- */
- public void testIteratorSkipsCentralDeletedNode()
- {
- //'delete' but don't remove the node for the 2nd subscription
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub2).delete());
- assertNotNull("Should still have been a node present for the deleted subscription",
- getNodeForSubscription(_subList, _sub2));
-
- SubscriptionList.SubscriptionNodeIterator iter = _subList.iterator();
-
- //verify the iterator returns the 1st subscriptions node
- assertTrue("Iterator should have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", _sub1, iter.getNode().getSubscription());
-
- //verify the iterator returns the 3rd subscriptions node and not the 2nd.
- assertTrue("Iterator should have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", _sub3, iter.getNode().getSubscription());
- }
-
- /**
- * Tests that after the last node of the list is marked deleted but has not yet been removed,
- * the iterator still skips it.
- */
- public void testIteratorSkipsDeletedFinalNode()
- {
- //'delete' but don't remove the node for the 3rd subscription
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub3).delete());
- assertNotNull("Should still have been a node present for the deleted 3rd subscription",
- getNodeForSubscription(_subList, _sub3));
-
- SubscriptionList.SubscriptionNodeIterator iter = _subList.iterator();
-
- //verify the iterator returns the 1st subscriptions node
- assertTrue("Iterator should have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", _sub1, iter.getNode().getSubscription());
-
- //verify the iterator returns the 2nd subscriptions node
- assertTrue("Iterator should have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", _sub2, iter.getNode().getSubscription());
-
- //verify the iterator can no longer advance and does not return a subscription node
- assertFalse("Iterator should not have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", null, iter.getNode());
- }
-}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 45643e2f9d..2093490ee2 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -34,8 +33,8 @@ import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -46,7 +45,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implements FlowCreditManager.FlowCreditManagerListener
+public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
{
private static final Option[] BATCHED = new Option[] { Option.BATCH };
@@ -69,16 +68,16 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
private final Map<String, Object> _arguments;
private int _deferredMessageCredit;
private long _deferredSizeCredit;
- private Subscription _subscription;
+ private Consumer _consumer;
- public SubscriptionTarget_0_10(ServerSession session,
- String name,
- MessageAcceptMode acceptMode,
- MessageAcquireMode acquireMode,
- MessageFlowMode flowMode,
- FlowCreditManager_0_10 creditManager,
- Map<String, Object> arguments)
+ public ConsumerTarget_0_10(ServerSession session,
+ String name,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ MessageFlowMode flowMode,
+ FlowCreditManager_0_10 creditManager,
+ Map<String, Object> arguments)
{
super(State.SUSPENDED);
_session = session;
@@ -93,9 +92,9 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
_name = name;
}
- public Subscription getSubscription()
+ public Consumer getConsumer()
{
- return _subscription;
+ return _consumer;
}
public boolean isSuspended()
@@ -108,7 +107,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
boolean closed = false;
State state = getState();
- getSubscription().getSendLock();
+ getConsumer().getSendLock();
try
{
while(!closed && state != State.CLOSED)
@@ -124,7 +123,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
}
finally
{
- getSubscription().releaseSendLock();
+ getConsumer().releaseSendLock();
}
return closed;
@@ -255,8 +254,8 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
- xfr = batch ? new MessageTransfer(getSubscription().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
- : new MessageTransfer(getSubscription().getName(),_acceptMode,_acquireMode,header,msg.getBody());
+ xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
+ : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody());
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
@@ -353,7 +352,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
{
entry.setRedelivered();
entry.routeToAlternate(null, null);
- if(entry.isAcquiredBy(getSubscription()))
+ if(entry.isAcquiredBy(getConsumer()))
{
entry.delete();
}
@@ -450,7 +449,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
{
try
{
- getSubscription().getSendLock();
+ getConsumer().getSendLock();
updateState(State.ACTIVE, State.SUSPENDED);
_stopped.set(true);
@@ -459,7 +458,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
}
finally
{
- getSubscription().releaseSendLock();
+ getConsumer().releaseSendLock();
}
}
@@ -519,7 +518,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
public void acknowledge(MessageInstance entry)
{
// TODO Fix Store Context / cleanup
- if(entry.isAcquiredBy(getSubscription()))
+ if(entry.isAcquiredBy(getConsumer()))
{
_unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
_unacknowledgedCount.decrementAndGet();
@@ -530,7 +529,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
public void flush() throws AMQException
{
flushCreditState(true);
- getSubscription().flush();
+ getConsumer().flush();
stop();
}
@@ -560,13 +559,13 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
@Override
- public void subscriptionRegistered(final Subscription sub)
+ public void consumerAdded(final Consumer sub)
{
- _subscription = sub;
+ _consumer = sub;
}
@Override
- public void subscriptionRemoved(final Subscription sub)
+ public void consumerRemoved(final Consumer sub)
{
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
index b7788911c7..4420709a91 100755
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -32,9 +31,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
private final MessageInstance _entry;
- private final SubscriptionTarget_0_10 _target;
+ private final ConsumerTarget_0_10 _target;
- public ExplicitAcceptDispositionChangeListener(MessageInstance entry, SubscriptionTarget_0_10 target)
+ public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
{
_entry = entry;
_target = target;
@@ -42,7 +41,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onAccept()
{
- if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
_target.getSessionModel().acknowledge(_target, _entry);
}
@@ -55,7 +54,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onRelease(boolean setRedelivered)
{
- if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
_target.release(_entry, setRedelivered);
}
@@ -67,7 +66,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onReject()
{
- if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
_target.reject(_entry);
}
@@ -80,7 +79,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public boolean acquire()
{
- return _entry.acquire(_target.getSubscription());
+ return _entry.acquire(_target.getConsumer());
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
index 0c238c4d55..c459364dbb 100755
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
{
@@ -31,9 +30,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
private final MessageInstance _entry;
- private SubscriptionTarget_0_10 _target;
+ private ConsumerTarget_0_10 _target;
- public ImplicitAcceptDispositionChangeListener(MessageInstance entry, SubscriptionTarget_0_10 target)
+ public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
{
_entry = entry;
_target = target;
@@ -46,7 +45,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onRelease(boolean setRedelivered)
{
- if(_entry.isAcquiredBy(_target.getSubscription()))
+ if(_entry.isAcquiredBy(_target.getConsumer()))
{
_target.release(_entry, setRedelivered);
}
@@ -58,7 +57,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onReject()
{
- if(_entry.isAcquiredBy(_target.getSubscription()))
+ if(_entry.isAcquiredBy(_target.getConsumer()))
{
_target.reject(_entry);
}
@@ -71,7 +70,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public boolean acquire()
{
- boolean acquired = _entry.acquire(_target.getSubscription());
+ boolean acquired = _entry.acquire(_target.getConsumer());
if(acquired)
{
_target.recordUnacknowledged(_entry);
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
index 71ad60c7b8..cd1146ac0b 100755
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
@@ -22,17 +22,16 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.transport.Method;
public class MessageAcceptCompletionListener implements Method.CompletionListener
{
- private final SubscriptionTarget_0_10 _sub;
+ private final ConsumerTarget_0_10 _sub;
private final MessageInstance _entry;
private final ServerSession _session;
private boolean _restoreCredit;
- public MessageAcceptCompletionListener(SubscriptionTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
+ public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
{
super();
_sub = sub;
@@ -47,7 +46,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
{
_sub.restoreCredit(_entry.getMessage());
}
- if(_entry.isAcquiredBy(_sub.getSubscription()))
+ if(_entry.isAcquiredBy(_sub.getConsumer()))
{
_session.acknowledge(_sub, _entry);
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 6348510b09..c85a415ce5 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -282,8 +282,8 @@ public class ServerConnectionDelegate extends ServerDelegate
private void stopAllSubscriptions(Connection conn, SessionDetach dtc)
{
final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
- final Collection<SubscriptionTarget_0_10> subs = ssn.getSubscriptions();
- for (SubscriptionTarget_0_10 subscription_0_10 : subs)
+ final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subs)
{
subscription_0_10.stop();
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 73263bd931..67b3740bb6 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -63,7 +63,6 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
@@ -137,7 +136,7 @@ public class ServerSession extends Session
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private Map<String, SubscriptionTarget_0_10> _subscriptions = new ConcurrentHashMap<String, SubscriptionTarget_0_10>();
+ private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
@@ -400,7 +399,7 @@ public class ServerSession extends Session
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
- public void acknowledge(final SubscriptionTarget_0_10 sub, final MessageInstance entry)
+ public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
{
_transaction.dequeue(entry.getOwningResource(), entry.getMessage(),
new ServerTransaction.Action()
@@ -421,22 +420,22 @@ public class ServerSession extends Session
});
}
- public Collection<SubscriptionTarget_0_10> getSubscriptions()
+ public Collection<ConsumerTarget_0_10> getSubscriptions()
{
return _subscriptions.values();
}
- public void register(String destination, SubscriptionTarget_0_10 sub)
+ public void register(String destination, ConsumerTarget_0_10 sub)
{
_subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
}
- public SubscriptionTarget_0_10 getSubscription(String destination)
+ public ConsumerTarget_0_10 getSubscription(String destination)
{
return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
}
- public void unregister(SubscriptionTarget_0_10 sub)
+ public void unregister(ConsumerTarget_0_10 sub)
{
_subscriptions.remove(sub.getName());
sub.close();
@@ -808,8 +807,8 @@ public class ServerSession extends Session
void unregisterSubscriptions()
{
- final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
- for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
unregister(subscription_0_10);
}
@@ -817,8 +816,8 @@ public class ServerSession extends Session
void stopSubscriptions()
{
- final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
- for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.stop();
}
@@ -827,8 +826,8 @@ public class ServerSession extends Session
public void receivedComplete()
{
- final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
- for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.flushCreditState(false);
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 515c1e6f8f..d1b24a2fdb 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -46,7 +46,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -257,7 +257,7 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
- SubscriptionTarget_0_10 target = new SubscriptionTarget_0_10((ServerSession)session, destination,
+ ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination,
method.getAcceptMode(),
method.getAcquireMode(),
MessageFlowMode.WINDOW,
@@ -268,31 +268,31 @@ public class ServerSessionDelegate extends SessionDelegate
((ServerSession)session).register(destination, target);
try
{
- EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
{
- options.add(Subscription.Option.ACQUIRES);
+ options.add(Consumer.Option.ACQUIRES);
}
if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
- options.add(Subscription.Option.SEES_REQUEUES);
+ options.add(Consumer.Option.SEES_REQUEUES);
}
if(method.getExclusive())
{
- options.add(Subscription.Option.EXCLUSIVE);
+ options.add(Consumer.Option.EXCLUSIVE);
}
- Subscription sub =
- queue.registerSubscription(target,
- filterManager,
- MessageTransferMessage.class,
- destination,
- options);
+ Consumer sub =
+ queue.addConsumer(target,
+ filterManager,
+ MessageTransferMessage.class,
+ destination,
+ options);
}
- catch (AMQQueue.ExistingExclusiveSubscription existing)
+ catch (AMQQueue.ExistingExclusiveConsumer existing)
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
}
- catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive)
+ catch (AMQQueue.ExistingConsumerPreventsExclusive exclusive)
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
}
@@ -405,7 +405,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = method.getDestination();
- SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -422,7 +422,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = method.getDestination();
- SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1476,7 +1476,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = sfm.getDestination();
- SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1493,7 +1493,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = stop.getDestination();
- SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1511,7 +1511,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = flow.getDestination();
- SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index ce1a132973..d2b1f83513 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -69,8 +69,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -113,7 +112,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private IncomingMessage _currentMessage;
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, SubscriptionTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, SubscriptionTarget_0_8>();
+ private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
private final MessageStore _messageStore;
@@ -488,10 +487,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- public Subscription getSubscription(AMQShortString tag)
+ public Consumer getSubscription(AMQShortString tag)
{
- final SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
- return target == null ? null : target.getSubscription();
+ final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
+ return target == null ? null : target.getConsumer();
}
/**
@@ -522,30 +521,30 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
throw new AMQException("Consumer already exists with same tag: " + tag);
}
- SubscriptionTarget_0_8 target;
- EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
+ ConsumerTarget_0_8 target;
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
- target = SubscriptionTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
- options.add(Subscription.Option.TRANSIENT);
+ target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+ options.add(Consumer.Option.TRANSIENT);
}
else if(acks)
{
- target = SubscriptionTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
- options.add(Subscription.Option.ACQUIRES);
- options.add(Subscription.Option.SEES_REQUEUES);
+ target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
}
else
{
- target = SubscriptionTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
- options.add(Subscription.Option.ACQUIRES);
- options.add(Subscription.Option.SEES_REQUEUES);
+ target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
}
if(exclusive)
{
- options.add(Subscription.Option.EXCLUSIVE);
+ options.add(Consumer.Option.EXCLUSIVE);
}
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
@@ -557,8 +556,12 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
try
{
- Subscription sub =
- queue.registerSubscription(target, FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), AMQMessage.class, AMQShortString.toString(tag), options);
+ Consumer sub =
+ queue.addConsumer(target,
+ FilterManagerFactory.createManager(FieldTable.convertToMap(filters)),
+ AMQMessage.class,
+ AMQShortString.toString(tag),
+ options);
}
catch (AMQException e)
{
@@ -582,8 +585,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
{
- SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
- Subscription sub = target == null ? null : target.getSubscription();
+ ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
+ Consumer sub = target == null ? null : target.getConsumer();
if (sub != null)
{
sub.close();
@@ -651,14 +654,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
}
- for (Map.Entry<AMQShortString, SubscriptionTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
+ for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
{
if (_logger.isInfoEnabled())
{
_logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- Subscription sub = me.getValue().getSubscription();
+ Consumer sub = me.getValue().getConsumer();
sub.close();
@@ -674,14 +677,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
* @param entry the record of the message on the queue that was delivered
* @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
* delivery tag)
- * @param subscription The consumer that is to acknowledge this message.
+ * @param consumer The consumer that is to acknowledge this message.
*/
- public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Subscription subscription)
+ public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer)
{
if (_logger.isDebugEnabled())
{
_logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
- + ") for " + subscription);
+ + ") for " + consumer);
}
@@ -928,9 +931,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if (wasSuspended)
{
// may need to deliver queued messages
- for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values())
+ for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
- s.getSubscription().externalStateChange();
+ s.getConsumer().externalStateChange();
}
}
@@ -944,15 +947,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if (!wasSuspended)
{
// may need to deliver queued messages
- for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values())
+ for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
try
{
- s.getSubscription().getSendLock();
+ s.getConsumer().getSendLock();
}
finally
{
- s.getSubscription().releaseSendLock();
+ s.getConsumer().releaseSendLock();
}
}
}
@@ -1029,10 +1032,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
boolean requiresSuspend = _suspended.compareAndSet(false,true);
// ensure all subscriptions have seen the change to the channel state
- for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
+ for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getSubscription().getSendLock();
- sub.getSubscription().releaseSendLock();
+ sub.getConsumer().getSendLock();
+ sub.getConsumer().releaseSendLock();
}
try
@@ -1052,7 +1055,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
for(MessageInstance entry : _resendList)
{
- Subscription sub = entry.getDeliveredSubscription();
+ Consumer sub = entry.getDeliveredConsumer();
if(sub == null || sub.isClosed())
{
entry.release();
@@ -1067,9 +1070,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if(requiresSuspend)
{
_suspended.set(false);
- for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
+ for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getSubscription().externalStateChange();
+ sub.getConsumer().externalStateChange();
}
}
@@ -1125,7 +1128,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag)
+ public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
{
addUnacknowledgedMessage(entry, deliveryTag, sub);
}
@@ -1472,7 +1475,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
- final Subscription sub = rejectedQueueEntry.getDeliveredSubscription();
+ final Consumer sub = rejectedQueueEntry.getDeliveredConsumer();
int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>()
{
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 131b88fee4..e83e86981b 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -94,7 +94,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -1668,7 +1668,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
@Override
- public void deliverToClient(final Subscription sub, final ServerMessage message,
+ public void deliverToClient(final Consumer sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
throws AMQException
{
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
index 7f52aa9a08..2e362c11f8 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
@@ -23,10 +23,10 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
public interface ClientDeliveryMethod
{
- void deliverToClient(final Subscription sub, final ServerMessage message, final InstanceProperties props,
+ void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props,
final long deliveryTag) throws AMQException;
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 9a6cec87bd..536bc18964 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -33,8 +33,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicLong;
* Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
* that was given out by the broker and the channel id. <p/>
*/
-public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget implements FlowCreditManager.FlowCreditManagerListener
+public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
{
private final StateChangeListener<QueueEntry, QueueEntry.State> _entryReleaseListener =
@@ -70,23 +70,23 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
- private Subscription _subscription;
+ private Consumer _consumer;
- public static SubscriptionTarget_0_8 createBrowserTarget(AMQChannel channel,
+ public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager) throws AMQException
{
- return new BrowserSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
- static final class BrowserSubscription extends SubscriptionTarget_0_8
+ static final class BrowserConsumer extends ConsumerTarget_0_8
{
- public BrowserSubscription(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ public BrowserConsumer(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
super(channel, consumerTag,
@@ -124,31 +124,31 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
}
- public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel,
+ public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager) throws AMQException
{
- return new NoAckSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
- public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel,
+ public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod) throws AMQException
{
- return new NoAckSubscription(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
- public static class NoAckSubscription extends SubscriptionTarget_0_8
+ public static class NoAckConsumer extends ConsumerTarget_0_8
{
private final AutoCommitTransaction _txn;
- public NoAckSubscription(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ public NoAckConsumer(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
@@ -221,13 +221,13 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
/**
* NoAck Subscription for use with BasicGet method.
*/
- public static final class GetNoAckSubscription extends SubscriptionTarget_0_8.NoAckSubscription
+ public static final class GetNoAckConsumer extends NoAckConsumer
{
- public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
@@ -241,32 +241,32 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
}
- public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel,
+ public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager)
throws AMQException
{
- return new AckSubscription(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ return new AckConsumer(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
- public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel,
+ public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
throws AMQException
{
- return new AckSubscription(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod);
+ return new AckConsumer(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod);
}
- static final class AckSubscription extends SubscriptionTarget_0_8
+ static final class AckConsumer extends ConsumerTarget_0_8
{
- public AckSubscription(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ public AckConsumer(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
@@ -305,7 +305,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
}
- private static final Logger _logger = Logger.getLogger(SubscriptionTarget_0_8.class);
+ private static final Logger _logger = Logger.getLogger(ConsumerTarget_0_8.class);
private final AMQChannel _channel;
@@ -320,12 +320,12 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
- public SubscriptionTarget_0_8(AMQChannel channel,
- AMQShortString consumerTag,
- FieldTable arguments,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ public ConsumerTarget_0_8(AMQChannel channel,
+ AMQShortString consumerTag,
+ FieldTable arguments,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
super(State.ACTIVE);
@@ -357,20 +357,20 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
}
}
- public Subscription getSubscription()
+ public Consumer getConsumer()
{
- return _subscription;
+ return _consumer;
}
@Override
- public void subscriptionRemoved(final Subscription sub)
+ public void consumerRemoved(final Consumer sub)
{
}
@Override
- public void subscriptionRegistered(final Subscription sub)
+ public void consumerAdded(final Consumer sub)
{
- _subscription = sub;
+ _consumer = sub;
}
public AMQSessionModel getSessionModel()
@@ -417,7 +417,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
boolean closed = false;
State state = getState();
- getSubscription().getSendLock();
+ getConsumer().getSendLock();
try
{
while(!closed && state != State.CLOSED)
@@ -433,7 +433,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
}
finally
{
- getSubscription().releaseSendLock();
+ getConsumer().releaseSendLock();
}
}
@@ -488,14 +488,14 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
throws AMQException
{
- _deliveryMethod.deliverToClient(getSubscription(), message, props, deliveryTag);
+ _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag);
}
protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag)
{
- _recordMethod.recordMessageDelivery(getSubscription(),entry,deliveryTag);
+ _recordMethod.recordMessageDelivery(getConsumer(),entry,deliveryTag);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
index 06c1d79439..1de1638c2e 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
@@ -24,11 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.consumer.Consumer;
import java.util.Map;
@@ -53,11 +49,11 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
{
message.setRedelivered();
- final Subscription subscription = message.getDeliveredSubscription();
- if (subscription != null)
+ final Consumer consumer = message.getDeliveredConsumer();
+ if (consumer != null)
{
// Consumer exists
- if (!subscription.isClosed())
+ if (!consumer.isClosed())
{
_msgToResend.put(deliveryTag, message);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
index 89a5aa55c1..70d7da3432 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
@@ -21,9 +21,9 @@
package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
public interface RecordDeliveryMethod
{
- void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag);
+ void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
index 836de44f4e..c2d02c1df8 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
@@ -156,14 +156,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
- catch (AMQQueue.ExistingExclusiveSubscription e)
+ catch (AMQQueue.ExistingExclusiveConsumer e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
+ " as it already has an existing exclusive consumer");
}
- catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+ catch (AMQQueue.ExistingConsumerPreventsExclusive e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index 53c0ae7381..d4bd486a99 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -38,14 +38,13 @@ import org.apache.qpid.server.flow.MessageOnlyCreditManager;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionTarget_0_8;
+import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.EnumSet;
@@ -133,7 +132,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
{
@Override
- public void deliverToClient(final Subscription sub, final ServerMessage message, final
+ public void deliverToClient(final Consumer sub, final ServerMessage message, final
InstanceProperties props, final long deliveryTag)
throws AMQException
{
@@ -150,30 +149,30 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag)
+ public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
{
channel.addUnacknowledgedMessage(entry, deliveryTag, null);
}
};
- SubscriptionTarget_0_8 target;
- EnumSet<Subscription.Option> options = EnumSet.of(Subscription.Option.TRANSIENT, Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES);
+ ConsumerTarget_0_8 target;
+ EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES);
if(acks)
{
- target = SubscriptionTarget_0_8.createAckTarget(channel,
- AMQShortString.EMPTY_STRING, null,
- singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ target = ConsumerTarget_0_8.createAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
else
{
- target = SubscriptionTarget_0_8.createNoAckTarget(channel,
- AMQShortString.EMPTY_STRING, null,
- singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ target = ConsumerTarget_0_8.createNoAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
- Subscription sub = queue.registerSubscription(target, null, AMQMessage.class, "", options);
+ Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
sub.flush();
sub.close();
return(!singleMessageCredit.hasCredit());
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index d542c82a5b..d9352f34f7 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -30,10 +30,9 @@ import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -49,8 +48,8 @@ import java.util.Set;
*/
public class AckTest extends QpidTestCase
{
- private SubscriptionTarget_0_8 _subscriptionTarget;
- private Subscription _subscription;
+ private ConsumerTarget_0_8 _subscriptionTarget;
+ private Consumer _consumer;
private AMQProtocolSession _protocolSession;
@@ -180,10 +179,13 @@ public class AckTest extends QpidTestCase
*/
public void testAckChannelAssociationTest() throws AMQException
{
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -207,16 +209,16 @@ public class AckTest extends QpidTestCase
public void testNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget,
- null,
- AMQMessage.class,
- DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget,
+ null,
+ AMQMessage.class,
+ DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -233,12 +235,12 @@ public class AckTest extends QpidTestCase
{
// false arg means no acks expected
- _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES, Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
@@ -256,13 +258,13 @@ public class AckTest extends QpidTestCase
public void testSingleAckReceivedTest() throws AMQException
{
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -293,13 +295,13 @@ public class AckTest extends QpidTestCase
public void testMultiAckReceivedTest() throws AMQException
{
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -327,13 +329,13 @@ public class AckTest extends QpidTestCase
public void testMultiAckAllReceivedTest() throws AMQException
{
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -364,15 +366,15 @@ public class AckTest extends QpidTestCase
Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 1;
publishMessages(msgCount);
- _subscription.externalStateChange();
+ _consumer.externalStateChange();
_channel.acknowledgeMessage(1, false);
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
index 9fa9e49c6b..e8c922b023 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
@@ -27,9 +27,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -65,7 +63,7 @@ public class ExtractResendAndRequeueTest extends TestCase
private static final int INITIAL_MSG_COUNT = 10;
private AMQQueue _queue;
private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
- private Subscription _subscription;
+ private Consumer _consumer;
private boolean _queueDeleted;
@Override
@@ -76,8 +74,8 @@ public class ExtractResendAndRequeueTest extends TestCase
_queue = mock(AMQQueue.class);
when(_queue.getName()).thenReturn(getName());
when(_queue.isDeleted()).thenReturn(_queueDeleted);
- _subscription = mock(Subscription.class);
- when(_subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
+ _consumer = mock(Consumer.class);
+ when(_consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement());
long id = 0;
@@ -123,7 +121,7 @@ public class ExtractResendAndRequeueTest extends TestCase
// Acquire messages in subscription
for(QueueEntry entry : messageList)
{
- when(entry.getDeliveredSubscription()).thenReturn(_subscription);
+ when(entry.getDeliveredConsumer()).thenReturn(_consumer);
}
}
@@ -168,7 +166,7 @@ public class ExtractResendAndRequeueTest extends TestCase
acquireMessages(_referenceList);
// Close subscription
- when(_subscription.isClosed()).thenReturn(true);
+ when(_consumer.isClosed()).thenReturn(true);
final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 94dd0acf99..1fad8fb41f 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -49,7 +49,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -245,7 +245,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
@Override
- public void deliverToClient(Subscription sub, ServerMessage message,
+ public void deliverToClient(Consumer sub, ServerMessage message,
InstanceProperties props, long deliveryTag) throws AMQException
{
_deliveryCount.incrementAndGet();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
index 8c716a0b56..247bc53cd1 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
@@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -131,7 +131,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
// indicate we are using the prefetch credit. i.e. using acks not No-Ack
assertTrue("The subscription has been suspended",
!getChannel().getSubscription(browser).getState()
- .equals(Subscription.State.SUSPENDED));
+ .equals(Consumer.State.SUSPENDED));
}
private void checkStoreContents(int messageCount)
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index a3793392d5..027c40aabe 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -43,15 +43,14 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.ServerTransaction;
import java.nio.ByteBuffer;
import java.util.List;
-class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
+class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
private final boolean _acquires;
private SendingLink_1_0 _link;
@@ -61,10 +60,10 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
private Binary _transactionId;
private final AMQPDescribedTypeRegistry _typeRegistry;
private final SectionEncoder _sectionEncoder;
- private Subscription _subscription;
+ private Consumer _consumer;
- public SubscriptionTarget_1_0(final SendingLink_1_0 link,
- boolean acquires)
+ public ConsumerTarget_1_0(final SendingLink_1_0 link,
+ boolean acquires)
{
super(State.SUSPENDED);
_link = link;
@@ -73,9 +72,9 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
_acquires = acquires;
}
- public Subscription getSubscription()
+ public Consumer getConsumer()
{
- return _subscription;
+ return _consumer;
}
private SendingLinkEndpoint getEndpoint()
@@ -94,7 +93,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
boolean closed = false;
State state = getState();
- getSubscription().getSendLock();
+ getConsumer().getSendLock();
try
{
while(!closed && state != State.CLOSED)
@@ -109,7 +108,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
}
finally
{
- getSubscription().releaseSendLock();
+ getConsumer().releaseSendLock();
}
}
@@ -255,7 +254,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
public void onRollback()
{
- if(queueEntry.isAcquiredBy(getSubscription()))
+ if(queueEntry.isAcquiredBy(getConsumer()))
{
queueEntry.release();
_link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
@@ -385,7 +384,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
public void postCommit()
{
- if(_queueEntry.isAcquiredBy(getSubscription()))
+ if(_queueEntry.isAcquiredBy(getConsumer()))
{
_queueEntry.delete();
}
@@ -499,13 +498,13 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
}
@Override
- public void subscriptionRegistered(final Subscription sub)
+ public void consumerAdded(final Consumer sub)
{
- _subscription = sub;
+ _consumer = sub;
}
@Override
- public void subscriptionRemoved(final Subscription sub)
+ public void consumerRemoved(final Consumer sub)
{
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 390944ad3a..2fff1856c7 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -68,8 +68,7 @@ import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -82,8 +81,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
private VirtualHost _vhost;
private SendingDestination _destination;
- private Subscription _subscription;
- private SubscriptionTarget_1_0 _target;
+ private Consumer _consumer;
+ private ConsumerTarget_1_0 _target;
private boolean _draining;
private final Map<Binary, MessageInstance> _unsettledMap =
@@ -112,7 +111,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
linkAttachment.setDeliveryStateHandler(this);
QueueDestination qd = null;
- EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
boolean noLocal = false;
@@ -175,11 +174,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- _target = new SubscriptionTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
+ _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
if(source.getDistributionMode() != StdDistMode.COPY)
{
- options.add(Subscription.Option.ACQUIRES);
- options.add(Subscription.Option.SEES_REQUEUES);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
}
}
@@ -376,9 +375,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
- _target = new SubscriptionTarget_1_0(this, true);
- options.add(Subscription.Option.ACQUIRES);
- options.add(Subscription.Option.SEES_REQUEUES);
+ _target = new ConsumerTarget_1_0(this, true);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
}
else
@@ -390,18 +389,18 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
if(noLocal)
{
- options.add(Subscription.Option.NO_LOCAL);
+ options.add(Consumer.Option.NO_LOCAL);
}
- _subscription.setNoLocal(noLocal);
+ _consumer.setNoLocal(noLocal);
try
{
- _subscription = _queue.registerSubscription(_target,
- messageFilter == null ? null : new SimpleFilterManager(messageFilter),
- Message_1_0.class, getEndpoint().getName(), options);
+ _consumer = _queue.addConsumer(_target,
+ messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+ Message_1_0.class, getEndpoint().getName(), options);
}
catch (AMQException e)
{
@@ -428,7 +427,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
try
{
- _subscription.close();
+ _consumer.close();
}
catch (AMQException e)
@@ -622,7 +621,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
{
- if(_subscription.isActive())
+ if(_consumer.isActive())
{
_target.suspend();
}
@@ -653,7 +652,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
if(outcome instanceof Accepted)
{
AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
- if(_subscription.acquires())
+ if(_consumer.acquires())
{
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
@@ -673,7 +672,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
else if(outcome instanceof Released)
{
AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
- if(_subscription.acquires())
+ if(_consumer.acquires())
{
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
index a71d833fc3..9ca23ce1ce 100644
--- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
+++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
@@ -20,7 +20,6 @@ package org.apache.qpid.server.management.plugin.servlet.rest;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -40,7 +39,7 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
@@ -327,8 +326,8 @@ public class MessageServlet extends AbstractServlet
: entry.isAcquired()
? "Acquired"
: "");
- final Subscription deliveredSubscription = entry.getDeliveredSubscription();
- object.put("deliveredTo", deliveredSubscription == null ? null : deliveredSubscription.getSubscriptionID());
+ final Consumer deliveredConsumer = entry.getDeliveredConsumer();
+ object.put("deliveredTo", deliveredConsumer == null ? null : deliveredConsumer.getId());
ServerMessage message = entry.getMessage();
if(message != null)
diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java
index 4defbd7eb0..db6da6f739 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java
@@ -46,7 +46,7 @@ import java.util.List;
* SUB-1002 : Close
* SUB-1003 : State : <state>
*/
-public class SubscriptionLoggingTest extends AbstractTestLogging
+public class ConsumerLoggingTest extends AbstractTestLogging
{
static final String SUB_PREFIX = "SUB-";