diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-05 10:29:55 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-05 10:29:55 +0000 |
commit | 7a54b9a25cf96675325a8cb6bfd1d2e4f43b8edd (patch) | |
tree | 0649e5d5fc37c77d4e6d79249d9a3e92be7c0f03 | |
parent | e993cb9572e57416b1a48cb64e9d4b90b2f16ea2 (diff) | |
download | qpid-python-7a54b9a25cf96675325a8cb6bfd1d2e4f43b8edd.tar.gz |
rename subscription to consumer
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564703 13f79535-47bb-0310-9956-ffa450edef68
55 files changed, 1398 insertions, 1399 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index 4d9550907e..aa721e598a 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -18,20 +18,20 @@ * under the License. * */ -package org.apache.qpid.server.subscription; +package org.apache.qpid.server.consumer; import org.apache.qpid.server.util.StateChangeListener; import java.util.concurrent.atomic.AtomicReference; -public abstract class AbstractSubscriptionTarget implements SubscriptionTarget +public abstract class AbstractConsumerTarget implements ConsumerTarget { private final AtomicReference<State> _state; - private final AtomicReference<StateChangeListener<SubscriptionTarget, State>> _stateListener = - new AtomicReference<StateChangeListener<SubscriptionTarget, State>>(); + private final AtomicReference<StateChangeListener<ConsumerTarget, State>> _stateListener = + new AtomicReference<StateChangeListener<ConsumerTarget, State>>(); - protected AbstractSubscriptionTarget(final State initialState) + protected AbstractConsumerTarget(final State initialState) { _state = new AtomicReference<State>(initialState); } @@ -46,7 +46,7 @@ public abstract class AbstractSubscriptionTarget implements SubscriptionTarget { if(_state.compareAndSet(from, to)) { - StateChangeListener<SubscriptionTarget, State> listener = _stateListener.get(); + StateChangeListener<ConsumerTarget, State> listener = _stateListener.get(); if(listener != null) { listener.stateChanged(this, from, to); @@ -60,12 +60,12 @@ public abstract class AbstractSubscriptionTarget implements SubscriptionTarget } - public final void setStateListener(StateChangeListener<SubscriptionTarget, State> listener) + public final void setStateListener(StateChangeListener<ConsumerTarget, State> listener) { _stateListener.set(listener); } - public final StateChangeListener<SubscriptionTarget, State> getStateListener() + public final StateChangeListener<ConsumerTarget, State> getStateListener() { return _stateListener.get(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java index 572b076ba2..4c1ebc3547 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.server.subscription; +package org.apache.qpid.server.consumer; import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.AMQException; @@ -27,7 +27,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.util.StateChangeListener; -public interface Subscription +public interface Consumer { AtomicLong SUB_ID_GENERATOR = new AtomicLong(0); @@ -63,11 +63,11 @@ public interface Subscription AMQSessionModel getSessionModel(); - MessageInstance.SubscriptionAcquiredState getOwningState(); + MessageInstance.ConsumerAcquiredState getOwningState(); void setNoLocal(boolean noLocal); - long getSubscriptionID(); + long getId(); boolean isSuspended(); @@ -101,7 +101,7 @@ public interface Subscription void restoreCredit(final MessageInstance queueEntry); - void setStateListener(final StateChangeListener<? extends Subscription, State> listener); + void setStateListener(final StateChangeListener<? extends Consumer, State> listener); public State getState(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index 80298cccc8..92579475ed 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.server.subscription; +package org.apache.qpid.server.consumer; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.MessageInstance; @@ -26,7 +26,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.util.StateChangeListener; -public interface SubscriptionTarget +public interface ConsumerTarget { @@ -37,11 +37,11 @@ public interface SubscriptionTarget State getState(); - void subscriptionRegistered(Subscription sub); + void consumerAdded(Consumer sub); - void subscriptionRemoved(Subscription sub); + void consumerRemoved(Consumer sub); - void setStateListener(StateChangeListener<SubscriptionTarget, State> listener); + void setStateListener(StateChangeListener<ConsumerTarget, State> listener); long getUnacknowledgedBytes(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java index 6251471139..91d9ef7dbc 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java @@ -35,7 +35,6 @@ import org.apache.qpid.server.logging.LogSubject; * 2) We can set new actors at the point we have enough information. i.e. * - Set a low level ConnectionActor when processing bytes from the wire. * - Set a ChannelActor when we are processing the frame - * - Set a SubscriptionActor when we are handling the subscription. * <p/> * The code performing the logging need not worry about what type of actor is * currently set so can perform its logging. The resulting log entry though will diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 80fa93c417..733fded846 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -25,7 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; @@ -49,25 +49,25 @@ public interface MessageInstance boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener); - boolean acquiredBySubscription(); + boolean acquiredByConsumer(); - boolean isAcquiredBy(Subscription subscription); + boolean isAcquiredBy(Consumer consumer); void setRedelivered(); boolean isRedelivered(); - Subscription getDeliveredSubscription(); + Consumer getDeliveredConsumer(); void reject(); - boolean isRejectedBy(Subscription subscription); + boolean isRejectedBy(Consumer consumer); boolean getDeliveredToConsumer(); boolean expired() throws AMQException; - boolean acquire(Subscription sub); + boolean acquire(Consumer sub); int getMaximumDeliveryCount(); @@ -148,7 +148,7 @@ public interface MessageInstance } } - public final class NonSubscriptionAcquiredState extends EntryState + public final class NonConsumerAcquiredState extends EntryState { public State getState() { @@ -161,13 +161,13 @@ public interface MessageInstance } } - public final class SubscriptionAcquiredState extends EntryState + public final class ConsumerAcquiredState extends EntryState { - private final Subscription _subscription; + private final Consumer _consumer; - public SubscriptionAcquiredState(Subscription subscription) + public ConsumerAcquiredState(Consumer consumer) { - _subscription = subscription; + _consumer = consumer; } @@ -176,14 +176,14 @@ public interface MessageInstance return State.ACQUIRED; } - public Subscription getSubscription() + public Consumer getConsumer() { - return _subscription; + return _consumer; } public String toString() { - return "{" + getState().name() + " : " + _subscription +"}"; + return "{" + getState().name() + " : " + _consumer +"}"; } } @@ -191,7 +191,7 @@ public interface MessageInstance final static EntryState AVAILABLE_STATE = new AvailableState(); final static EntryState DELETED_STATE = new DeletedState(); final static EntryState DEQUEUED_STATE = new DequeuedState(); - final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState(); + final static EntryState NON_CONSUMER_ACQUIRED_STATE = new NonConsumerAcquiredState(); boolean isAvailable(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java index e813d0c129..355a1cf3b1 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java @@ -77,6 +77,6 @@ public interface Session extends ConfiguredObject CHANNEL_ID, PRODUCER_FLOW_BLOCKED)); - Collection<Consumer> getSubscriptions(); + Collection<Consumer> getConsumers(); Collection<Publisher> getPublishers(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java index 850a6c9d80..cf6874030b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java @@ -22,33 +22,32 @@ package org.apache.qpid.server.model.adapter; import java.util.Map; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import java.security.AccessControlException; import java.util.Collection; import java.util.Collections; -public class ConsumerAdapter extends AbstractAdapter implements Consumer +public class ConsumerAdapter extends AbstractAdapter implements org.apache.qpid.server.model.Consumer { - private final Subscription _subscription; + private final Consumer _consumer; private final QueueAdapter _queue; private final SessionAdapter _session; private final ConsumerStatistics _statistics; public ConsumerAdapter(final QueueAdapter queueAdapter, final SessionAdapter sessionAdapter, - final Subscription subscription) + final Consumer consumer) { super(UUIDGenerator.generateConsumerUUID(queueAdapter.getVirtualHost().getName(), queueAdapter.getName(), - subscription.getSessionModel().getConnectionModel().getRemoteAddressString(), - String.valueOf(subscription.getSessionModel().getChannelId()), - subscription.getName()), queueAdapter.getTaskExecutor()); - _subscription = subscription; + consumer.getSessionModel().getConnectionModel().getRemoteAddressString(), + String.valueOf(consumer.getSessionModel().getChannelId()), + consumer.getName()), queueAdapter.getTaskExecutor()); + _consumer = consumer; _queue = queueAdapter; _session = sessionAdapter; _statistics = new ConsumerStatistics(); @@ -57,7 +56,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer public String getName() { - return _subscription.getName(); + return _consumer.getName(); } public String setName(final String currentName, final String desiredName) @@ -107,7 +106,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer @Override public Collection<String> getAttributeNames() { - return Consumer.AVAILABLE_ATTRIBUTES; + return org.apache.qpid.server.model.Consumer.AVAILABLE_ATTRIBUTES; } @Override @@ -147,7 +146,7 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer } else if(DISTRIBUTION_MODE.equals(name)) { - return _subscription.acquires() ? "MOVE" : "COPY"; + return _consumer.acquires() ? "MOVE" : "COPY"; } else if(SETTLEMENT_MODE.equals(name)) { @@ -197,11 +196,11 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer { if(name.equals(BYTES_OUT)) { - return _subscription.getBytesOut(); + return _consumer.getBytesOut(); } else if(name.equals(MESSAGES_OUT)) { - return _subscription.getMessagesOut(); + return _consumer.getMessagesOut(); } else if(name.equals(STATE_CHANGED)) { @@ -209,11 +208,11 @@ public class ConsumerAdapter extends AbstractAdapter implements Consumer } else if(name.equals(UNACKNOWLEDGED_BYTES)) { - return _subscription.getUnacknowledgedBytes(); + return _consumer.getUnacknowledgedBytes(); } else if(name.equals(UNACKNOWLEDGED_MESSAGES)) { - return _subscription.getUnacknowledgedMessages(); + return _consumer.getUnacknowledgedMessages(); } return null; // TODO - Implement } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index 074f7c243b..d59b13902b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -35,7 +35,6 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFinder; -import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.LifetimePolicy; @@ -47,10 +46,11 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.MapValueConverter; -final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.SubscriptionRegistrationListener, AMQQueue.NotificationListener +final class QueueAdapter extends AbstractAdapter implements Queue, + AMQQueue.ConsumerRegistrationListener, AMQQueue.NotificationListener { @SuppressWarnings("serial") static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{ @@ -69,8 +69,8 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs private final AMQQueue _queue; private final Map<Binding, BindingAdapter> _bindingAdapters = new HashMap<Binding, BindingAdapter>(); - private Map<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter> _consumerAdapters = - new HashMap<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter>(); + private Map<Consumer, ConsumerAdapter> _consumerAdapters = + new HashMap<Consumer, ConsumerAdapter>(); private final VirtualHostAdapter _vhost; @@ -84,7 +84,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter); _queue = queue; - _queue.addSubscriptionRegistrationListener(this); + _queue.addConsumerRegistrationListener(this); populateConsumers(); _statistics = new QueueStatisticsAdapter(queue); _queue.setNotificationListener(this); @@ -124,21 +124,21 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs private void populateConsumers() { - Collection<org.apache.qpid.server.subscription.Subscription> actualSubscriptions = _queue.getConsumers(); + Collection<Consumer> actualConsumers = _queue.getConsumers(); synchronized (_consumerAdapters) { - Iterator<org.apache.qpid.server.subscription.Subscription> iter = _consumerAdapters.keySet().iterator(); - for(org.apache.qpid.server.subscription.Subscription subscription : actualSubscriptions) + Iterator<Consumer> iter = _consumerAdapters.keySet().iterator(); + for(Consumer consumer : actualConsumers) { - if(!_consumerAdapters.containsKey(subscription)) + if(!_consumerAdapters.containsKey(consumer)) { - SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel()); - ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, subscription); - _consumerAdapters.put(subscription, adapter); + SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel()); + ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, consumer); + _consumerAdapters.put(consumer, adapter); if (sessionAdapter != null) { // Register ConsumerAdapter with the SessionAdapter. - sessionAdapter.subscriptionRegistered(subscription, adapter); + sessionAdapter.consumerRegistered(consumer, adapter); } } } @@ -153,11 +153,11 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs } } - public Collection<Consumer> getConsumers() + public Collection<org.apache.qpid.server.model.Consumer> getConsumers() { synchronized (_consumerAdapters) { - return new ArrayList<Consumer>(_consumerAdapters.values()); + return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values()); } } @@ -502,7 +502,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs @Override public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) { - if(clazz == Consumer.class) + if(clazz == org.apache.qpid.server.model.Consumer.class) { return (Collection<C>) getConsumers(); } @@ -587,19 +587,19 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs return _queue; } - public void subscriptionRegistered(final AMQQueue queue, final Subscription subscription) + public void consumerAdded(final AMQQueue queue, final Consumer consumer) { ConsumerAdapter adapter = null; synchronized (_consumerAdapters) { - if(!_consumerAdapters.containsKey(subscription)) + if(!_consumerAdapters.containsKey(consumer)) { - SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel()); - adapter = new ConsumerAdapter(this, sessionAdapter, subscription); - _consumerAdapters.put(subscription, adapter); + SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel()); + adapter = new ConsumerAdapter(this, sessionAdapter, consumer); + _consumerAdapters.put(consumer, adapter); if (sessionAdapter != null) { // Register ConsumerAdapter with the SessionAdapter. - sessionAdapter.subscriptionRegistered(subscription, adapter); + sessionAdapter.consumerRegistered(consumer, adapter); } } } @@ -609,20 +609,20 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs } } - public void subscriptionUnregistered(final AMQQueue queue, final Subscription subscription) + public void consumerRemoved(final AMQQueue queue, final Consumer consumer) { ConsumerAdapter adapter = null; synchronized (_consumerAdapters) { - adapter = _consumerAdapters.remove(subscription); + adapter = _consumerAdapters.remove(consumer); } if(adapter != null) { - SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel()); + SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel()); if (sessionAdapter != null) { // Unregister ConsumerAdapter with the SessionAdapter. - sessionAdapter.subscriptionUnregistered(subscription); + sessionAdapter.consumerUnregistered(consumer); } childRemoved(adapter); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index 31ce7e56fd..6b76eeefa0 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -34,9 +34,8 @@ import org.apache.qpid.server.model.Publisher; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; -import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -47,7 +46,7 @@ final class SessionAdapter extends AbstractAdapter implements Session private AMQSessionModel _session; private SessionStatistics _statistics; - private Map<Subscription, ConsumerAdapter> _consumerAdapters = new HashMap<Subscription, ConsumerAdapter>(); + private Map<Consumer, ConsumerAdapter> _consumerAdapters = new HashMap<Consumer, ConsumerAdapter>(); public SessionAdapter(final AMQSessionModel session, TaskExecutor taskExecutor) { @@ -56,11 +55,11 @@ final class SessionAdapter extends AbstractAdapter implements Session _statistics = new SessionStatistics(); } - public Collection<Consumer> getSubscriptions() + public Collection<org.apache.qpid.server.model.Consumer> getConsumers() { synchronized (_consumerAdapters) { - return new ArrayList<Consumer>(_consumerAdapters.values()); + return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values()); } } @@ -119,29 +118,29 @@ final class SessionAdapter extends AbstractAdapter implements Session } /** - * Register a ConsumerAdapter (Subscription) with this Session keyed by the Subscription. - * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter. + * Register a ConsumerAdapter with this Session keyed by the Consumer. + * @param consumer the org.apache.qpid.server.consumer.Consumer used to key the ConsumerAdapter. * @param adapter the registered ConsumerAdapter. */ - void subscriptionRegistered(Subscription subscription, ConsumerAdapter adapter) + void consumerRegistered(Consumer consumer, ConsumerAdapter adapter) { synchronized (_consumerAdapters) { - _consumerAdapters.put(subscription, adapter); + _consumerAdapters.put(consumer, adapter); } childAdded(adapter); } /** - * Unregister a ConsumerAdapter (Subscription) with this Session keyed by the Subscription. - * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter. + * Unregister a ConsumerAdapter with this Session keyed by the Consumer. + * @param consumer the org.apache.qpid.server.consumer.Consumer used to key the ConsumerAdapter. */ - void subscriptionUnregistered(Subscription subscription) + void consumerUnregistered(Consumer consumer) { ConsumerAdapter adapter = null; synchronized (_consumerAdapters) { - adapter = _consumerAdapters.remove(subscription); + adapter = _consumerAdapters.remove(consumer); } if (adapter != null) { @@ -188,9 +187,9 @@ final class SessionAdapter extends AbstractAdapter implements Session @Override public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) { - if(clazz == Consumer.class) + if(clazz == org.apache.qpid.server.model.Consumer.class) { - return (Collection<C>) getSubscriptions(); + return (Collection<C>) getConsumers(); } else if(clazz == Publisher.class) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 8cd8a3edee..018ba454e4 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -31,8 +31,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionTarget; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -83,27 +83,27 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa VirtualHost getVirtualHost(); - Subscription registerSubscription(final SubscriptionTarget target, final FilterManager filters, - final Class<? extends ServerMessage> messageClass, - final String consumerName, EnumSet<Subscription.Option> options) throws AMQException; + Consumer addConsumer(final ConsumerTarget target, final FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, EnumSet<Consumer.Option> options) throws AMQException; - Collection<Subscription> getConsumers(); + Collection<Consumer> getConsumers(); - interface SubscriptionRegistrationListener + interface ConsumerRegistrationListener { - void subscriptionRegistered(AMQQueue queue, Subscription subscription); - void subscriptionUnregistered(AMQQueue queue, Subscription subscription); + void consumerAdded(AMQQueue queue, Consumer consumer); + void consumerRemoved(AMQQueue queue, Consumer consumer); } - void addSubscriptionRegistrationListener(SubscriptionRegistrationListener listener); - void removeSubscriptionRegistrationListener(SubscriptionRegistrationListener listener); + void addConsumerRegistrationListener(ConsumerRegistrationListener listener); + void removeConsumerRegistrationListener(ConsumerRegistrationListener listener); int getConsumerCount(); int getActiveConsumerCount(); - boolean hasExclusiveSubscriber(); + boolean hasExclusiveConsumer(); boolean isUnused(); @@ -126,11 +126,11 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa void requeue(QueueEntry entry); - void dequeue(QueueEntry entry, Subscription sub); + void dequeue(QueueEntry entry, Consumer sub); void decrementUnackedMsgCount(QueueEntry queueEntry); - boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; + boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException; void addQueueDeleteTask(Action<AMQQueue> task); void removeQueueDeleteTask(Action<AMQQueue> task); @@ -209,7 +209,7 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa Set<NotificationCheck> getNotificationChecks(); - void flushSubscription(final Subscription sub) throws AMQException; + void flushConsumer(final Consumer sub) throws AMQException; void deliverAsync(); @@ -227,43 +227,43 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa void checkCapacity(AMQSessionModel channel); /** - * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription + * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer * already exists. * * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists. + * <tr><td> Represent failure to create a consumer, because an exclusive consumer already exists. * </table> * * @todo Not an AMQP exception as no status code. * * @todo Move to top level, used outside this class. */ - static final class ExistingExclusiveSubscription extends AMQException + static final class ExistingExclusiveConsumer extends AMQException { - public ExistingExclusiveSubscription() + public ExistingExclusiveConsumer() { super(""); } } /** - * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusive subscription, as a subscription + * ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer * already exists. * * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represent failure to create an exclusive subscription, as a subscription already exists. + * <tr><td> Represent failure to create an exclusive consumer, as a consumer already exists. * </table> * * @todo Not an AMQP exception as no status code. * * @todo Move to top level, used outside this class. */ - static final class ExistingSubscriptionPreventsExclusive extends AMQException + static final class ExistingConsumerPreventsExclusive extends AMQException { - public ExistingSubscriptionPreventsExclusive() + public ExistingConsumerPreventsExclusive() { super(""); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedSubscriptionMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java index e7e34ac2b8..a9b36c1b24 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedSubscriptionMessageGroupManager.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,16 +28,16 @@ import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; -public class AssignedSubscriptionMessageGroupManager implements MessageGroupManager +public class AssignedConsumerMessageGroupManager implements MessageGroupManager { - private static final Logger _logger = LoggerFactory.getLogger(AssignedSubscriptionMessageGroupManager.class); + private static final Logger _logger = LoggerFactory.getLogger(AssignedConsumerMessageGroupManager.class); private final String _groupId; - private final ConcurrentHashMap<Integer, QueueSubscription> _groupMap = new ConcurrentHashMap<Integer, QueueSubscription>(); + private final ConcurrentHashMap<Integer, QueueConsumer> _groupMap = new ConcurrentHashMap<Integer, QueueConsumer>(); private final int _groupMask; - public AssignedSubscriptionMessageGroupManager(final String groupId, final int maxGroups) + public AssignedConsumerMessageGroupManager(final String groupId, final int maxGroups) { _groupId = groupId; _groupMask = pow2(maxGroups)-1; @@ -53,13 +53,13 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana return val; } - public QueueSubscription getAssignedSubscription(final QueueEntry entry) + public QueueConsumer getAssignedConsumer(final QueueEntry entry) { Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId); return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask); } - public boolean acceptMessage(QueueSubscription sub, QueueEntry entry) + public boolean acceptMessage(QueueConsumer sub, QueueEntry entry) { if(assignMessage(sub, entry)) { @@ -71,7 +71,7 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana } } - private boolean assignMessage(QueueSubscription sub, QueueEntry entry) + private boolean assignMessage(QueueConsumer sub, QueueEntry entry) { Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId); if(groupVal == null) @@ -81,7 +81,7 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana else { Integer group = groupVal.hashCode() & _groupMask; - QueueSubscription assignedSub = _groupMap.get(group); + QueueConsumer assignedSub = _groupMap.get(group); if(assignedSub == sub) { return true; @@ -105,7 +105,7 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana } } - public QueueEntry findEarliestAssignedAvailableEntry(QueueSubscription sub) + public QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer sub) { EntryFinder visitor = new EntryFinder(sub); sub.getQueue().visit(visitor); @@ -115,9 +115,9 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana private class EntryFinder implements QueueEntryVisitor { private QueueEntry _entry; - private QueueSubscription _sub; + private QueueConsumer _sub; - public EntryFinder(final QueueSubscription sub) + public EntryFinder(final QueueConsumer sub) { _sub = sub; } @@ -136,7 +136,7 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana } Integer group = groupId.hashCode() & _groupMask; - Subscription assignedSub = _groupMap.get(group); + Consumer assignedSub = _groupMap.get(group); if(assignedSub == _sub) { _entry = entry; @@ -154,9 +154,9 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana } } - public void clearAssignments(QueueSubscription sub) + public void clearAssignments(QueueConsumer sub) { - Iterator<QueueSubscription> subIter = _groupMap.values().iterator(); + Iterator<QueueConsumer> subIter = _groupMap.values().iterator(); while(subIter.hasNext()) { if(subIter.next() == sub) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java index 805bccd744..d5c03abc93 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.StateChangeListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,23 +38,23 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager private final String _groupId; private final String _defaultGroup; private final Map<Object, Group> _groupMap = new HashMap<Object, Group>(); - private final SubscriptionResetHelper _resetHelper; + private final ConsumerResetHelper _resetHelper; private final class Group { private final Object _group; - private QueueSubscription _subscription; + private QueueConsumer _consumer; private int _activeCount; - private Group(final Object key, final QueueSubscription subscription) + private Group(final Object key, final QueueConsumer consumer) { _group = key; - _subscription = subscription; + _consumer = consumer; } public boolean add() { - if(_subscription != null) + if(_consumer != null) { _activeCount++; return true; @@ -69,8 +69,8 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager { if(--_activeCount == 0) { - _resetHelper.resetSubPointersForGroups(_subscription, false); - _subscription = null; + _resetHelper.resetSubPointersForGroups(_consumer, false); + _consumer = null; _groupMap.remove(_group); } } @@ -100,12 +100,12 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager public boolean isValid() { - return !(_subscription == null || (_activeCount == 0 && _subscription.isClosed())); + return !(_consumer == null || (_activeCount == 0 && _consumer.isClosed())); } - public QueueSubscription getSubscription() + public QueueConsumer getConsumer() { - return _subscription; + return _consumer; } @Override @@ -113,28 +113,28 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager { return "Group{" + "_group=" + _group + - ", _subscription=" + _subscription + + ", _consumer=" + _consumer + ", _activeCount=" + _activeCount + '}'; } } - public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, SubscriptionResetHelper resetHelper) + public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper resetHelper) { _groupId = groupId; _defaultGroup = defaultGroup; _resetHelper = resetHelper; } - public synchronized QueueSubscription getAssignedSubscription(final QueueEntry entry) + public synchronized QueueConsumer getAssignedConsumer(final QueueEntry entry) { Object groupId = getKey(entry); Group group = _groupMap.get(groupId); - return group == null || !group.isValid() ? null : group.getSubscription(); + return group == null || !group.isValid() ? null : group.getConsumer(); } - public synchronized boolean acceptMessage(final QueueSubscription sub, final QueueEntry entry) + public synchronized boolean acceptMessage(final QueueConsumer sub, final QueueEntry entry) { if(assignMessage(sub, entry)) { @@ -146,7 +146,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } } - private boolean assignMessage(final QueueSubscription sub, final QueueEntry entry) + private boolean assignMessage(final QueueConsumer sub, final QueueEntry entry) { Object groupId = getKey(entry); Group group = _groupMap.get(groupId); @@ -158,15 +158,15 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager _groupMap.put(groupId, group); // there's a small change that the group became empty between the point at which getNextAvailable() was - // called on the subscription, and when accept message is called... in that case we want to avoid delivering + // called on the consumer, and when accept message is called... in that case we want to avoid delivering // out of order - if(_resetHelper.isEntryAheadOfSubscription(entry, sub)) + if(_resetHelper.isEntryAheadOfConsumer(entry, sub)) { return false; } } - Subscription assignedSub = group.getSubscription(); + Consumer assignedSub = group.getConsumer(); if(assignedSub == sub) { @@ -179,7 +179,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } } - public synchronized QueueEntry findEarliestAssignedAvailableEntry(final QueueSubscription sub) + public synchronized QueueEntry findEarliestAssignedAvailableEntry(final QueueConsumer sub) { EntryFinder visitor = new EntryFinder(sub); sub.getQueue().visit(visitor); @@ -189,9 +189,9 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager private class EntryFinder implements QueueEntryVisitor { private QueueEntry _entry; - private QueueSubscription _sub; + private QueueConsumer _sub; - public EntryFinder(final QueueSubscription sub) + public EntryFinder(final QueueConsumer sub) { _sub = sub; } @@ -206,7 +206,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager Object groupId = getKey(entry); Group group = _groupMap.get(groupId); - if(group != null && group.getSubscription() == _sub) + if(group != null && group.getConsumer() == _sub) { _entry = entry; return true; @@ -224,7 +224,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } - public void clearAssignments(final QueueSubscription sub) + public void clearAssignments(final QueueConsumer sub) { } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java index 4ecddf6181..740a96bf2d 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java @@ -20,22 +20,20 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.subscription.Subscription; - public interface MessageGroupManager { - public interface SubscriptionResetHelper + public interface ConsumerResetHelper { - public void resetSubPointersForGroups(QueueSubscription subscription, boolean clearAssignments); + public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments); - boolean isEntryAheadOfSubscription(QueueEntry entry, QueueSubscription sub); + boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub); } - QueueSubscription getAssignedSubscription(QueueEntry entry); + QueueConsumer getAssignedConsumer(QueueEntry entry); - boolean acceptMessage(QueueSubscription sub, QueueEntry entry); + boolean acceptMessage(QueueConsumer sub, QueueEntry entry); - QueueEntry findEarliestAssignedAvailableEntry(QueueSubscription sub); + QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer sub); - void clearAssignments(QueueSubscription sub); + void clearAssignments(QueueConsumer sub); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java index 4f87e18e63..6918ae683c 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.queue.SubscriptionList; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; @@ -38,16 +36,16 @@ public abstract class OutOfOrderQueue extends SimpleAMQQueue } @Override - protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) + protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry) { - // check that all subscriptions are not in advance of the entry - SubscriptionList.SubscriptionNodeIterator subIter = getSubscriptionList().iterator(); + // check that all consumers are not in advance of the entry + QueueConsumerList.ConsumerNodeIterator subIter = getConsumerList().iterator(); while(subIter.advance() && !entry.isAcquired()) { - final QueueSubscription subscription = subIter.getNode().getSubscription(); - if(!subscription.isClosed()) + final QueueConsumer consumer = subIter.getNode().getConsumer(); + if(!consumer.isClosed()) { - QueueContext context = subscription.getQueueContext(); + QueueContext context = consumer.getQueueContext(); if(context != null) { QueueEntry released = context.getReleasedEntry(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index f683159e12..d8332f7306 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -33,8 +33,8 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionTarget; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.util.StateChangeListener; import java.text.MessageFormat; @@ -47,16 +47,16 @@ import java.util.concurrent.locks.ReentrantLock; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; -class QueueSubscription<T extends SubscriptionTarget> implements Subscription +class QueueConsumer<T extends ConsumerTarget> implements Consumer { - private static final Logger _logger = Logger.getLogger(QueueSubscription.class); + private static final Logger _logger = Logger.getLogger(QueueConsumer.class); private final AtomicBoolean _targetClosed = new AtomicBoolean(false); private final AtomicBoolean _closed = new AtomicBoolean(false); - private final long _subscriptionID; + private final long _id; private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); private final Lock _stateChangeLock = new ReentrantLock(); private final long _createTime = System.currentTimeMillis(); - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); + private final MessageInstance.ConsumerAcquiredState _owningState = new MessageInstance.ConsumerAcquiredState(this); private final boolean _acquires; private final boolean _seesRequeues; private final String _consumerName; @@ -69,39 +69,39 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription private SimpleAMQQueue _queue; private GenericActor _logActor; - static final EnumMap<SubscriptionTarget.State, State> STATE_MAP = - new EnumMap<SubscriptionTarget.State, State>(SubscriptionTarget.State.class); + static final EnumMap<ConsumerTarget.State, State> STATE_MAP = + new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class); static { - STATE_MAP.put(SubscriptionTarget.State.ACTIVE, State.ACTIVE); - STATE_MAP.put(SubscriptionTarget.State.SUSPENDED, State.SUSPENDED); - STATE_MAP.put(SubscriptionTarget.State.CLOSED, State.CLOSED); + STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE); + STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.SUSPENDED); + STATE_MAP.put(ConsumerTarget.State.CLOSED, State.CLOSED); } private final T _target; private final SubFlushRunner _runner = new SubFlushRunner(this); private volatile QueueContext _queueContext; - private StateChangeListener<? extends Subscription, State> _stateListener = new StateChangeListener<Subscription, State>() + private StateChangeListener<? extends Consumer, State> _stateListener = new StateChangeListener<Consumer, State>() { - public void stateChanged(Subscription sub, State oldState, State newState) + public void stateChanged(Consumer sub, State oldState, State newState) { CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); } }; private boolean _noLocal; - QueueSubscription(final FilterManager filters, - final Class<? extends ServerMessage> messageClass, - final boolean acquires, - final boolean seesRequeues, - final String consumerName, - final boolean isTransient, - T target) + QueueConsumer(final FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final boolean acquires, + final boolean seesRequeues, + final String consumerName, + final boolean isTransient, + T target) { _messageClass = messageClass; _sessionReference = target.getSessionModel().getConnectionReference(); - _subscriptionID = SUB_ID_GENERATOR.getAndIncrement(); + _id = SUB_ID_GENERATOR.getAndIncrement(); _filters = filters; _acquires = acquires; _seesRequeues = seesRequeues; @@ -109,23 +109,23 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription _isTransient = isTransient; _target = target; _target.setStateListener( - new StateChangeListener<SubscriptionTarget, SubscriptionTarget.State>() + new StateChangeListener<ConsumerTarget, ConsumerTarget.State>() { @Override - public void stateChanged(final SubscriptionTarget object, - final SubscriptionTarget.State oldState, - final SubscriptionTarget.State newState) + public void stateChanged(final ConsumerTarget object, + final ConsumerTarget.State oldState, + final ConsumerTarget.State newState) { targetStateChanged(oldState, newState); } }); } - private void targetStateChanged(final SubscriptionTarget.State oldState, final SubscriptionTarget.State newState) + private void targetStateChanged(final ConsumerTarget.State oldState, final ConsumerTarget.State newState) { if(oldState != newState) { - if(newState == SubscriptionTarget.State.CLOSED) + if(newState == ConsumerTarget.State.CLOSED) { if(_targetClosed.compareAndSet(false,true)) { @@ -138,7 +138,7 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription } } - if(newState == SubscriptionTarget.State.CLOSED && oldState != newState && !_closed.get()) + if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get()) { try { @@ -146,12 +146,12 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription } catch (AMQException e) { - _logger.error("Unable to remove to remove subscription", e); + _logger.error("Unable to remove to remove consumer", e); throw new RuntimeException(e); } } - final StateChangeListener<Subscription, State> stateListener = - (StateChangeListener<Subscription, State>) getStateListener(); + final StateChangeListener<Consumer, State> stateListener = + (StateChangeListener<Consumer, State>) getStateListener(); if(stateListener != null) { stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState)); @@ -202,8 +202,8 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription try { _target.close(); - _target.subscriptionRemoved(this); - _queue.unregisterSubscription(this); + _target.consumerRemoved(this); + _queue.unregisterConsumer(this); } finally { @@ -258,13 +258,13 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription { if(getQueue() != null) { - throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); + throw new IllegalStateException("Attempt to set queue for consumer " + this + " to " + queue + "when already set to " + getQueue()); } _queue = queue; String queueString = new QueueLogSubject(_queue).toLogString(); - _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId()) + "(" // queueString is [vh(/{0})/qu({1}) ] so need to trim // ^ ^^ @@ -295,7 +295,7 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription @Override public final void flush() throws AMQException { - getQueue().flushSubscription(this); + getQueue().flushConsumer(this); } @Override @@ -309,17 +309,17 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription return _runner; } - public final long getSubscriptionID() + public final long getId() { - return _subscriptionID; + return _id; } - public final StateChangeListener<? extends Subscription, State> getStateListener() + public final StateChangeListener<? extends Consumer, State> getStateListener() { return _stateListener; } - public final void setStateListener(StateChangeListener<? extends Subscription, State> listener) + public final void setStateListener(StateChangeListener<? extends Consumer, State> listener) { _stateListener = listener; } @@ -430,7 +430,7 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription return _createTime; } - public final MessageInstance.SubscriptionAcquiredState getOwningState() + public final MessageInstance.ConsumerAcquiredState getOwningState() { return _owningState; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubscriptionList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java index 7329cbb88b..82e9d58cf3 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubscriptionList.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java @@ -24,28 +24,28 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -class SubscriptionList +class QueueConsumerList { - private final SubscriptionNode _head = new SubscriptionNode(); + private final ConsumerNode _head = new ConsumerNode(); - private final AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head); - private final AtomicReference<SubscriptionNode> _subNodeMarker = new AtomicReference<SubscriptionNode>(_head); + private final AtomicReference<ConsumerNode> _tail = new AtomicReference<ConsumerNode>(_head); + private final AtomicReference<ConsumerNode> _subNodeMarker = new AtomicReference<ConsumerNode>(_head); private final AtomicInteger _size = new AtomicInteger(); - public static final class SubscriptionNode + public static final class ConsumerNode { private final AtomicBoolean _deleted = new AtomicBoolean(); - private final AtomicReference<SubscriptionNode> _next = new AtomicReference<SubscriptionNode>(); - private final QueueSubscription _sub; + private final AtomicReference<ConsumerNode> _next = new AtomicReference<ConsumerNode>(); + private final QueueConsumer _sub; - public SubscriptionNode() + public ConsumerNode() { //used for sentinel head and dummy node construction _sub = null; _deleted.set(true); } - public SubscriptionNode(final QueueSubscription sub) + public ConsumerNode(final QueueConsumer sub) { //used for regular node construction _sub = sub; @@ -57,12 +57,12 @@ class SubscriptionList * * @return the next non-deleted node, or null if none was found. */ - public SubscriptionNode findNext() + public ConsumerNode findNext() { - SubscriptionNode next = nextNode(); + ConsumerNode next = nextNode(); while(next != null && next.isDeleted()) { - final SubscriptionNode newNext = next.nextNode(); + final ConsumerNode newNext = next.nextNode(); if(newNext != null) { //try to move our _next reference forward to the 'newNext' @@ -86,7 +86,7 @@ class SubscriptionList * * @return the immediately next node in the structure, or null if at the tail. */ - protected SubscriptionNode nextNode() + protected ConsumerNode nextNode() { return _next.get(); } @@ -94,10 +94,10 @@ class SubscriptionList /** * Used to initialise the 'next' reference. Will only succeed if the reference was not previously set. * - * @param node the SubscriptionNode to set as 'next' + * @param node the ConsumerNode to set as 'next' * @return whether the operation succeeded */ - private boolean setNext(final SubscriptionNode node) + private boolean setNext(final ConsumerNode node) { return _next.compareAndSet(null, node); } @@ -112,18 +112,18 @@ class SubscriptionList return _deleted.compareAndSet(false,true); } - public QueueSubscription getSubscription() + public QueueConsumer getConsumer() { return _sub; } } - private void insert(final SubscriptionNode node, final boolean count) + private void insert(final ConsumerNode node, final boolean count) { for (;;) { - SubscriptionNode tail = _tail.get(); - SubscriptionNode next = tail.nextNode(); + ConsumerNode tail = _tail.get(); + ConsumerNode next = tail.nextNode(); if (tail == _tail.get()) { if (next == null) @@ -146,35 +146,35 @@ class SubscriptionList } } - public void add(final QueueSubscription sub) + public void add(final QueueConsumer sub) { - SubscriptionNode node = new SubscriptionNode(sub); + ConsumerNode node = new ConsumerNode(sub); insert(node, true); } - public boolean remove(final QueueSubscription sub) + public boolean remove(final QueueConsumer sub) { - SubscriptionNode prevNode = _head; - SubscriptionNode node = _head.nextNode(); + ConsumerNode prevNode = _head; + ConsumerNode node = _head.nextNode(); while(node != null) { - if(sub.equals(node.getSubscription()) && node.delete()) + if(sub.equals(node.getConsumer()) && node.delete()) { _size.decrementAndGet(); - SubscriptionNode tail = _tail.get(); + ConsumerNode tail = _tail.get(); if(node == tail) { //we cant remove the last node from the structure for //correctness reasons, however we have just 'deleted' //the tail. Inserting an empty dummy node after it will - //let us scavenge the node containing the Subscription. - insert(new SubscriptionNode(), false); + //let us scavenge the node containing the Consumer. + insert(new ConsumerNode(), false); } //advance the next node reference in the 'prevNode' to scavenge - //the newly 'deleted' node for the Subscription. + //the newly 'deleted' node for the Consumer. prevNode.findNext(); nodeMarkerCleanup(node); @@ -189,9 +189,9 @@ class SubscriptionList return false; } - private void nodeMarkerCleanup(final SubscriptionNode node) + private void nodeMarkerCleanup(final ConsumerNode node) { - SubscriptionNode markedNode = _subNodeMarker.get(); + ConsumerNode markedNode = _subNodeMarker.get(); if(node == markedNode) { //if the marked node is the one we are removing, then @@ -200,7 +200,7 @@ class SubscriptionList //into the list and find the next node to use. //Because we inserted a dummy if node was the //tail, markedNode.nextNode() can never be null. - SubscriptionNode dummy = new SubscriptionNode(); + ConsumerNode dummy = new ConsumerNode(); dummy.setNext(markedNode.nextNode()); //if the CAS fails the marked node has changed, thus @@ -219,53 +219,53 @@ class SubscriptionList } } - public boolean updateMarkedNode(final SubscriptionNode expected, final SubscriptionNode nextNode) + public boolean updateMarkedNode(final ConsumerNode expected, final ConsumerNode nextNode) { return _subNodeMarker.compareAndSet(expected, nextNode); } /** - * Get the current marked SubscriptionNode. This should only be used only to index into the list and find the next node + * Get the current marked ConsumerNode. This should only be used only to index into the list and find the next node * after the mark, since if the previously marked node was subsequently deleted the item returned may be a dummy node * with reference to the next node. * * @return the previously marked node (or a dummy if it was subsequently deleted) */ - public SubscriptionNode getMarkedNode() + public ConsumerNode getMarkedNode() { return _subNodeMarker.get(); } - public static class SubscriptionNodeIterator + public static class ConsumerNodeIterator { - private SubscriptionNode _lastNode; + private ConsumerNode _lastNode; - SubscriptionNodeIterator(SubscriptionNode startNode) + ConsumerNodeIterator(ConsumerNode startNode) { _lastNode = startNode; } - public SubscriptionNode getNode() + public ConsumerNode getNode() { return _lastNode; } public boolean advance() { - SubscriptionNode nextNode = _lastNode.findNext(); + ConsumerNode nextNode = _lastNode.findNext(); _lastNode = nextNode; return _lastNode != null; } } - public SubscriptionNodeIterator iterator() + public ConsumerNodeIterator iterator() { - return new SubscriptionNodeIterator(_head); + return new ConsumerNodeIterator(_head); } - public SubscriptionNode getHead() + public ConsumerNode getHead() { return _head; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 61f38f963c..d984cf8ab4 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -20,13 +20,7 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.StateChangeListener; public interface QueueEntry extends MessageInstance, Comparable<QueueEntry> { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index ea50337b61..7908cc9de7 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -29,7 +29,7 @@ import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -186,7 +186,7 @@ public abstract class QueueEntryImpl implements QueueEntry public boolean acquire() { - return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE); + return acquire(NON_CONSUMER_ACQUIRED_STATE); } private boolean acquire(final EntryState state) @@ -201,7 +201,7 @@ public abstract class QueueEntryImpl implements QueueEntry return acquired; } - public boolean acquire(Subscription sub) + public boolean acquire(Consumer sub) { final boolean acquired = acquire(sub.getOwningState()); if(acquired) @@ -211,17 +211,17 @@ public abstract class QueueEntryImpl implements QueueEntry return acquired; } - public boolean acquiredBySubscription() + public boolean acquiredByConsumer() { - return (_state instanceof SubscriptionAcquiredState); + return (_state instanceof ConsumerAcquiredState); } - public boolean isAcquiredBy(Subscription subscription) + public boolean isAcquiredBy(Consumer consumer) { EntryState state = _state; - return state instanceof SubscriptionAcquiredState - && ((SubscriptionAcquiredState)state).getSubscription() == subscription; + return state instanceof ConsumerAcquiredState + && ((ConsumerAcquiredState)state).getConsumer() == consumer; } public void release() @@ -231,7 +231,7 @@ public abstract class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE)) { - if(state instanceof SubscriptionAcquiredState) + if(state instanceof ConsumerAcquiredState) { getQueue().decrementUnackedMsgCount(this); } @@ -263,12 +263,12 @@ public abstract class QueueEntryImpl implements QueueEntry return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED)); } - public Subscription getDeliveredSubscription() + public Consumer getDeliveredConsumer() { EntryState state = _state; - if (state instanceof SubscriptionAcquiredState) + if (state instanceof ConsumerAcquiredState) { - return ((SubscriptionAcquiredState) state).getSubscription(); + return ((ConsumerAcquiredState) state).getConsumer(); } else { @@ -278,16 +278,16 @@ public abstract class QueueEntryImpl implements QueueEntry public void reject() { - Subscription subscription = getDeliveredSubscription(); + Consumer consumer = getDeliveredConsumer(); - if (subscription != null) + if (consumer != null) { if (_rejectedBy == null) { _rejectedBy = new HashSet<Long>(); } - _rejectedBy.add(subscription.getSubscriptionID()); + _rejectedBy.add(consumer.getId()); } else { @@ -295,12 +295,12 @@ public abstract class QueueEntryImpl implements QueueEntry } } - public boolean isRejectedBy(Subscription subscription) + public boolean isRejectedBy(Consumer consumer) { - if (_rejectedBy != null) // We have subscriptions that rejected this message + if (_rejectedBy != null) // We have consumers that rejected this message { - return _rejectedBy.contains(subscription.getSubscriptionID()); + return _rejectedBy.contains(consumer.getId()); } else // This message hasn't been rejected yet. { @@ -314,8 +314,8 @@ public abstract class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { - Subscription s = null; - if (state instanceof SubscriptionAcquiredState) + Consumer s = null; + if (state instanceof ConsumerAcquiredState) { getQueue().decrementUnackedMsgCount(this); } @@ -490,7 +490,7 @@ public abstract class QueueEntryImpl implements QueueEntry @Override public boolean resend() throws AMQException { - Subscription sub = getDeliveredSubscription(); + Consumer sub = getDeliveredConsumer(); if(sub != null) { return sub.resend(this); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index 22a2029494..005d9b66b3 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -32,8 +32,8 @@ import org.apache.qpid.transport.TransportException; /** * QueueRunners are Runnables used to process a queue when requiring - * asynchronous message delivery to subscriptions, which is necessary - * when straight-through delivery of a message to a subscription isn't + * asynchronous message delivery to consumers, which is necessary + * when straight-through delivery of a message to a consumer isn't * possible during the enqueue operation. */ public class QueueRunner implements Runnable diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 36dc04a5fc..78585997be 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -46,8 +46,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionTarget; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -56,8 +56,8 @@ import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; public class SimpleAMQQueue implements AMQQueue, - StateChangeListener<QueueSubscription, Subscription.State>, - MessageGroupManager.SubscriptionResetHelper + StateChangeListener<QueueConsumer, Consumer.State>, + MessageGroupManager.ConsumerResetHelper { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); @@ -92,9 +92,9 @@ public class SimpleAMQQueue implements AMQQueue, private final QueueEntryList<QueueEntry> _entries; - private final SubscriptionList _subscriptionList = new SubscriptionList(); + private final QueueConsumerList _consumerList = new QueueConsumerList(); - private volatile QueueSubscription _exclusiveSubscriber; + private volatile QueueConsumer _exclusiveSubscriber; @@ -177,8 +177,8 @@ public class SimpleAMQQueue implements AMQQueue, private int _maximumDeliveryCount; private final MessageGroupManager _messageGroupManager; - private final Collection<SubscriptionRegistrationListener> _subscriptionListeners = - new ArrayList<SubscriptionRegistrationListener>(); + private final Collection<ConsumerRegistrationListener> _consumerListeners = + new ArrayList<ConsumerRegistrationListener>(); private AMQQueue.NotificationListener _notificationListener; private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; @@ -247,7 +247,7 @@ public class SimpleAMQQueue implements AMQQueue, } else { - _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get( + _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(arguments.get( Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); } } @@ -378,15 +378,15 @@ public class SimpleAMQQueue implements AMQQueue, return _name; } - // ------ Manage Subscriptions + // ------ Manage Consumers @Override - public QueueSubscription registerSubscription(final SubscriptionTarget target, - final FilterManager filters, - final Class<? extends ServerMessage> messageClass, - final String consumerName, - EnumSet<Subscription.Option> optionSet) throws AMQException + public QueueConsumer addConsumer(final ConsumerTarget target, + final FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, + EnumSet<Consumer.Option> optionSet) throws AMQException { // Access control @@ -396,59 +396,59 @@ public class SimpleAMQQueue implements AMQQueue, } - if (hasExclusiveSubscriber()) + if (hasExclusiveConsumer()) { - throw new ExistingExclusiveSubscription(); + throw new ExistingExclusiveConsumer(); } - boolean exclusive = optionSet.contains(Subscription.Option.EXCLUSIVE); - boolean isTransient = optionSet.contains(Subscription.Option.TRANSIENT); + boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE); + boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT); if (exclusive && !isTransient && getConsumerCount() != 0) { - throw new ExistingSubscriptionPreventsExclusive(); + throw new ExistingConsumerPreventsExclusive(); } - QueueSubscription subscription = new QueueSubscription(filters, messageClass, - optionSet.contains(Subscription.Option.ACQUIRES), - optionSet.contains(Subscription.Option.SEES_REQUEUES), - consumerName, optionSet.contains(Subscription.Option.TRANSIENT), target); - target.subscriptionRegistered(subscription); + QueueConsumer consumer = new QueueConsumer(filters, messageClass, + optionSet.contains(Consumer.Option.ACQUIRES), + optionSet.contains(Consumer.Option.SEES_REQUEUES), + consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target); + target.consumerAdded(consumer); if (exclusive && !isTransient) { - _exclusiveSubscriber = subscription; + _exclusiveSubscriber = consumer; } - if(subscription.isActive()) + if(consumer.isActive()) { _activeSubscriberCount.incrementAndGet(); } - subscription.setStateListener(this); - subscription.setQueueContext(new QueueContext(_entries.getHead())); + consumer.setStateListener(this); + consumer.setQueueContext(new QueueContext(_entries.getHead())); if (!isDeleted()) { - subscription.setQueue(this, exclusive); + consumer.setQueue(this, exclusive); if(_nolocal) { - subscription.setNoLocal(_nolocal); + consumer.setNoLocal(_nolocal); } - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - for(SubscriptionRegistrationListener listener : _subscriptionListeners) + for(ConsumerRegistrationListener listener : _consumerListeners) { - listener.subscriptionRegistered(this, subscription); + listener.consumerAdded(this, consumer); } } - _subscriptionList.add(subscription); + _consumerList.add(consumer); - //Increment consumerCountHigh if necessary. (un)registerSubscription are both + //Increment consumerCountHigh if necessary. (un)registerConsumer are both //synchronized methods so we don't need additional synchronization here if(_consumerCountHigh.get() < getConsumerCount()) { @@ -457,7 +457,7 @@ public class SimpleAMQQueue implements AMQQueue, if (isDeleted()) { - subscription.queueDeleted(); + consumer.queueDeleted(); } } else @@ -465,27 +465,27 @@ public class SimpleAMQQueue implements AMQQueue, // TODO } - deliverAsync(subscription); + deliverAsync(consumer); - return subscription; + return consumer; } - synchronized void unregisterSubscription(final QueueSubscription subscription) throws AMQException + synchronized void unregisterConsumer(final QueueConsumer consumer) throws AMQException { - if (subscription == null) + if (consumer == null) { - throw new NullPointerException("subscription argument is null"); + throw new NullPointerException("consumer argument is null"); } - boolean removed = _subscriptionList.remove(subscription); + boolean removed = _consumerList.remove(consumer); if (removed) { - subscription.close(); + consumer.close(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); - subscription.setQueueContext(null); + consumer.setQueueContext(null); if(!isDeleted() && isExclusive() && getConsumerCount() == 0) { @@ -494,20 +494,20 @@ public class SimpleAMQQueue implements AMQQueue, if(_messageGroupManager != null) { - resetSubPointersForGroups(subscription, true); + resetSubPointersForGroups(consumer, true); } - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - for(SubscriptionRegistrationListener listener : _subscriptionListeners) + for(ConsumerRegistrationListener listener : _consumerListeners) { - listener.subscriptionUnregistered(this, subscription); + listener.consumerRemoved(this, consumer); } } // auto-delete queues must be deleted if there are no remaining subscribers - if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 ) + if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 ) { if (_logger.isInfoEnabled()) { @@ -516,57 +516,57 @@ public class SimpleAMQQueue implements AMQQueue, getVirtualHost().removeQueue(this); - // we need to manually fire the event to the removed subscription (which was the last one left for this - // queue. This is because the delete method uses the subscription set which has just been cleared - subscription.queueDeleted(); + // we need to manually fire the event to the removed consumer (which was the last one left for this + // queue. This is because the delete method uses the consumer set which has just been cleared + consumer.queueDeleted(); } } } - public Collection<Subscription> getConsumers() + public Collection<Consumer> getConsumers() { - List<Subscription> consumers = new ArrayList<Subscription>(); - SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator(); + List<Consumer> consumers = new ArrayList<Consumer>(); + QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator(); while(iter.advance()) { - consumers.add(iter.getNode().getSubscription()); + consumers.add(iter.getNode().getConsumer()); } return consumers; } - public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener) + public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener) { - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - _subscriptionListeners.add(listener); + _consumerListeners.add(listener); } } - public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener) + public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener) { - synchronized (_subscriptionListeners) + synchronized (_consumerListeners) { - _subscriptionListeners.remove(listener); + _consumerListeners.remove(listener); } } - public void resetSubPointersForGroups(QueueSubscription subscription, boolean clearAssignments) + public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments) { - QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription); + QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer); if(clearAssignments) { - _messageGroupManager.clearAssignments(subscription); + _messageGroupManager.clearAssignments(consumer); } if(entry != null) { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator(); // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards while (subscriberIter.advance()) { - QueueSubscription sub = subscriberIter.getNode().getSubscription(); + QueueConsumer sub = subscriberIter.getNode().getConsumer(); // we don't make browsers send the same stuff twice if (sub.seesRequeues()) @@ -644,35 +644,35 @@ public class SimpleAMQQueue implements AMQQueue, QueueEntry entry; - final QueueSubscription exclusiveSub = _exclusiveSubscriber; + final QueueConsumer exclusiveSub = _exclusiveSubscriber; entry = _entries.add(message); if(action != null || (exclusiveSub == null && _queueRunner.isIdle())) { /* - iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message + iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message */ - SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode(); - SubscriptionList.SubscriptionNode nextNode = node.findNext(); + QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode(); + QueueConsumerList.ConsumerNode nextNode = node.findNext(); if (nextNode == null) { - nextNode = _subscriptionList.getHead().findNext(); + nextNode = _consumerList.getHead().findNext(); } while (nextNode != null) { - if (_subscriptionList.updateMarkedNode(node, nextNode)) + if (_consumerList.updateMarkedNode(node, nextNode)) { break; } else { - node = _subscriptionList.getMarkedNode(); + node = _consumerList.getMarkedNode(); nextNode = node.findNext(); if (nextNode == null) { - nextNode = _subscriptionList.getHead().findNext(); + nextNode = _consumerList.getHead().findNext(); } } } @@ -686,13 +686,13 @@ public class SimpleAMQQueue implements AMQQueue, if (nextNode == null) { loops--; - nextNode = _subscriptionList.getHead(); + nextNode = _consumerList.getHead(); } else { - // if subscription at end, and active, offer - QueueSubscription sub = nextNode.getSubscription(); - deliverToSubscription(sub, entry); + // if consumer at end, and active, offer + QueueConsumer sub = nextNode.getConsumer(); + deliverToConsumer(sub, entry); } nextNode = nextNode.findNext(); @@ -702,7 +702,7 @@ public class SimpleAMQQueue implements AMQQueue, if (entry.isAvailable()) { - checkSubscriptionsNotAheadOfDelivery(entry); + checkConsumersNotAheadOfDelivery(entry); if (exclusiveSub != null) { @@ -723,7 +723,7 @@ public class SimpleAMQQueue implements AMQQueue, } - private void deliverToSubscription(final QueueSubscription sub, final QueueEntry entry) + private void deliverToConsumer(final QueueConsumer sub, final QueueEntry entry) throws AMQException { @@ -732,14 +732,14 @@ public class SimpleAMQQueue implements AMQQueue, try { if (!sub.isSuspended() - && subscriptionReadyAndHasInterest(sub, entry) + && consumerReadyAndHasInterest(sub, entry) && mightAssign(sub, entry) && !sub.wouldSuspend(entry)) { if (sub.acquires() && !assign(sub, entry)) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription + // to acquire the entry for this consumer sub.restoreCredit(entry); } else @@ -755,7 +755,7 @@ public class SimpleAMQQueue implements AMQQueue, } } - private boolean assign(final QueueSubscription sub, final QueueEntry entry) + private boolean assign(final QueueConsumer sub, final QueueEntry entry) { if(_messageGroupManager == null) { @@ -769,17 +769,17 @@ public class SimpleAMQQueue implements AMQQueue, } } - private boolean mightAssign(final QueueSubscription sub, final QueueEntry entry) + private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry) { if(_messageGroupManager == null || !sub.acquires()) { return true; } - QueueSubscription assigned = _messageGroupManager.getAssignedSubscription(entry); + QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry); return (assigned == null) || (assigned == sub); } - protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) + protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry) { // This method is only required for queues which mess with ordering // Simple Queues don't :-) @@ -813,7 +813,7 @@ public class SimpleAMQQueue implements AMQQueue, getAtomicQueueCount().incrementAndGet(); } - private void deliverMessage(final QueueSubscription sub, final QueueEntry entry, boolean batch) + private void deliverMessage(final QueueConsumer sub, final QueueEntry entry, boolean batch) throws AMQException { setLastSeenEntry(sub, entry); @@ -824,13 +824,13 @@ public class SimpleAMQQueue implements AMQQueue, sub.send(entry, batch); } - private boolean subscriptionReadyAndHasInterest(final QueueSubscription sub, final QueueEntry entry) throws AMQException + private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final QueueEntry entry) throws AMQException { return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry); } - private void setLastSeenEntry(final QueueSubscription sub, final QueueEntry entry) + private void setLastSeenEntry(final QueueConsumer sub, final QueueEntry entry) { QueueContext subContext = sub.getQueueContext(); if (subContext != null) @@ -845,7 +845,7 @@ public class SimpleAMQQueue implements AMQQueue, } } - private void updateSubRequeueEntry(final QueueSubscription sub, final QueueEntry entry) + private void updateSubRequeueEntry(final QueueConsumer sub, final QueueEntry entry) { QueueContext subContext = sub.getQueueContext(); @@ -865,11 +865,11 @@ public class SimpleAMQQueue implements AMQQueue, public void requeue(QueueEntry entry) { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator(); // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards while (subscriberIter.advance() && entry.isAvailable()) { - QueueSubscription sub = subscriberIter.getNode().getSubscription(); + QueueConsumer sub = subscriberIter.getNode().getConsumer(); // we don't make browsers send the same stuff twice if (sub.seesRequeues()) @@ -882,11 +882,11 @@ public class SimpleAMQQueue implements AMQQueue, } - public void dequeue(QueueEntry entry, Subscription sub) + public void dequeue(QueueEntry entry, Consumer sub) { decrementQueueCount(); decrementQueueSize(entry); - if (entry.acquiredBySubscription()) + if (entry.acquiredByConsumer()) { _deliveredMessages.decrementAndGet(); } @@ -914,17 +914,17 @@ public class SimpleAMQQueue implements AMQQueue, _dequeueCount.incrementAndGet(); } - public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException + public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException { - /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message - entry to resend and move back the subscription pointer. */ + /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message + entry to resend and move back the consumer pointer. */ - subscription.getSendLock(); + consumer.getSendLock(); try { - if (!subscription.isClosed()) + if (!consumer.isClosed()) { - deliverMessage((QueueSubscription)subscription, entry, false); + deliverMessage((QueueConsumer) consumer, entry, false); return true; } else @@ -934,7 +934,7 @@ public class SimpleAMQQueue implements AMQQueue, } finally { - subscription.releaseSendLock(); + consumer.releaseSendLock(); } } @@ -942,7 +942,7 @@ public class SimpleAMQQueue implements AMQQueue, public int getConsumerCount() { - return _subscriptionList.size(); + return _consumerList.size(); } public int getConsumerCountHigh() @@ -1025,16 +1025,16 @@ public class SimpleAMQQueue implements AMQQueue, } - public void stateChanged(QueueSubscription sub, Subscription.State oldState, Subscription.State newState) + public void stateChanged(QueueConsumer sub, Consumer.State oldState, Consumer.State newState) { - if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE) + if (oldState == Consumer.State.ACTIVE && newState != Consumer.State.ACTIVE) { _activeSubscriberCount.decrementAndGet(); } - else if (newState == Subscription.State.ACTIVE) + else if (newState == Consumer.State.ACTIVE) { - if (oldState != Subscription.State.ACTIVE) + if (oldState != Consumer.State.ACTIVE) { _activeSubscriberCount.incrementAndGet(); @@ -1058,12 +1058,12 @@ public class SimpleAMQQueue implements AMQQueue, return _atomicQueueSize; } - public boolean hasExclusiveSubscriber() + public boolean hasExclusiveConsumer() { return _exclusiveSubscriber != null; } - private void setExclusiveSubscriber(QueueSubscription exclusiveSubscriber) + private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber) { _exclusiveSubscriber = exclusiveSubscriber; } @@ -1079,9 +1079,9 @@ public class SimpleAMQQueue implements AMQQueue, return _entries; } - protected SubscriptionList getSubscriptionList() + protected QueueConsumerList getConsumerList() { - return _subscriptionList; + return _consumerList; } @@ -1315,11 +1315,11 @@ public class SimpleAMQQueue implements AMQQueue, b.getExchange().removeBinding(b); } - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); - while (subscriptionIter.advance()) + while (consumerNodeIterator.advance()) { - QueueSubscription s = subscriptionIter.getNode().getSubscription(); + QueueConsumer s = consumerNodeIterator.getNode().getConsumer(); if (s != null) { s.queueDeleted(); @@ -1449,7 +1449,7 @@ public class SimpleAMQQueue implements AMQQueue, } - public void deliverAsync(QueueSubscription sub) + public void deliverAsync(QueueConsumer sub) { if(_exclusiveSubscriber == null) { @@ -1463,17 +1463,17 @@ public class SimpleAMQQueue implements AMQQueue, } - public void flushSubscription(Subscription sub) throws AMQException + public void flushConsumer(Consumer sub) throws AMQException { // Access control if (!getVirtualHost().getSecurityManager().authoriseConsume(this)) { throw new AMQSecurityException("Permission denied: " + getName()); } - flushSubscription(sub, Long.MAX_VALUE); + flushConsumer(sub, Long.MAX_VALUE); } - public boolean flushSubscription(Subscription sub, long iterations) throws AMQException + public boolean flushConsumer(Consumer sub, long iterations) throws AMQException { boolean atTail = false; final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES; @@ -1494,8 +1494,8 @@ public class SimpleAMQQueue implements AMQQueue, sub.getSendLock(); } - atTail = attemptDelivery((QueueSubscription)sub, true); - if (atTail && getNextAvailableEntry((QueueSubscription)sub) == null) + atTail = attemptDelivery((QueueConsumer)sub, true); + if (atTail && getNextAvailableEntry((QueueConsumer)sub) == null) { queueEmpty = true; } @@ -1529,21 +1529,21 @@ public class SimpleAMQQueue implements AMQQueue, } - // if there's (potentially) more than one subscription the others will potentially not have been advanced to the + // if there's (potentially) more than one consumer the others will potentially not have been advanced to the // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc // which would give us memory "leak". - if (!hasExclusiveSubscriber()) + if (!hasExclusiveConsumer()) { - advanceAllSubscriptions(); + advanceAllConsumers(); } return atTail; } /** - * Attempt delivery for the given subscription. + * Attempt delivery for the given consumer. * - * Looks up the next node for the subscription and attempts to deliver it. + * Looks up the next node for the consumer and attempts to deliver it. * * * @param sub @@ -1551,7 +1551,7 @@ public class SimpleAMQQueue implements AMQQueue, * @return true if we have completed all possible deliveries for this sub. * @throws AMQException */ - private boolean attemptDelivery(QueueSubscription sub, boolean batch) throws AMQException + private boolean attemptDelivery(QueueConsumer sub, boolean batch) throws AMQException { boolean atTail = false; @@ -1570,7 +1570,7 @@ public class SimpleAMQQueue implements AMQQueue, if (sub.acquires() && !assign(sub, node)) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription + // to acquire the entry for this consumer sub.restoreCredit(node); } else @@ -1581,7 +1581,7 @@ public class SimpleAMQQueue implements AMQQueue, } else // Not enough Credit for message and wouldSuspend { - //QPID-1187 - Treat the subscription as suspended for this message + //QPID-1187 - Treat the consumer as suspended for this message // and wait for the message to be removed to continue delivery. subActive = false; node.addStateChangeListener(new QueueEntryListener(sub)); @@ -1594,13 +1594,13 @@ public class SimpleAMQQueue implements AMQQueue, return atTail || !subActive; } - protected void advanceAllSubscriptions() throws AMQException + protected void advanceAllConsumers() throws AMQException { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - while (subscriberIter.advance()) + QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); + while (consumerNodeIterator.advance()) { - SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode(); - QueueSubscription sub = subNode.getSubscription(); + QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode(); + QueueConsumer sub = subNode.getConsumer(); if(sub.acquires()) { getNextAvailableEntry(sub); @@ -1612,7 +1612,7 @@ public class SimpleAMQQueue implements AMQQueue, } } - private QueueEntry getNextAvailableEntry(final QueueSubscription sub) + private QueueEntry getNextAvailableEntry(final QueueConsumer sub) throws AMQException { QueueContext context = sub.getQueueContext(); @@ -1653,7 +1653,7 @@ public class SimpleAMQQueue implements AMQQueue, } } - public boolean isEntryAheadOfSubscription(QueueEntry entry, QueueSubscription sub) + public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub) { QueueContext context = sub.getQueueContext(); if(context != null) @@ -1672,14 +1672,14 @@ public class SimpleAMQQueue implements AMQQueue, * * A queue Runner is started whenever a state change occurs, e.g when a new * message arrives on the queue and cannot be immediately delivered to a - * subscription (i.e. asynchronous delivery is required). Unless there are - * SubFlushRunners operating (due to subscriptions unsuspending) which are + * consumer (i.e. asynchronous delivery is required). Unless there are + * SubFlushRunners operating (due to consumers unsuspending) which are * capable of accepting/delivering all messages then these messages would * otherwise remain on the queue. * * processQueue should be running while there are messages on the queue AND - * there are subscriptions that can deliver them. If there are no - * subscriptions capable of delivering the remaining messages on the queue + * there are consumers that can deliver them. If there are no + * consumers capable of delivering the remaining messages on the queue * then processQueue should stop to prevent spinning. * * Since processQueue is runs in a fixed size Executor, it should not run @@ -1703,7 +1703,7 @@ public class SimpleAMQQueue implements AMQQueue, boolean lastLoop = false; int iterations = MAX_ASYNC_DELIVERIES; - final int numSubs = _subscriptionList.size(); + final int numSubs = _consumerList.size(); final int perSub = Math.max(iterations / Math.max(numSubs,1), 1); @@ -1714,8 +1714,8 @@ public class SimpleAMQQueue implements AMQQueue, // So whilst delivery/rejection is going on a processQueue thread will be running while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete)) { - // we want to have one extra loop after every subscription has reached the point where it cannot move - // further, just in case the advance of one subscription in the last loop allows a different subscription to + // we want to have one extra loop after every consumer has reached the point where it cannot move + // further, just in case the advance of one consumer in the last loop allows a different consumer to // move forward in the next iteration if (previousStateChangeCount != stateChangeCount) @@ -1727,14 +1727,14 @@ public class SimpleAMQQueue implements AMQQueue, } previousStateChangeCount = stateChangeCount; - boolean allSubscriptionsDone = true; - boolean subscriptionDone; + boolean allConsumersDone = true; + boolean consumerDone; - SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); + QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); //iterate over the subscribers and try to advance their pointer - while (subscriptionIter.advance()) + while (consumerNodeIterator.advance()) { - QueueSubscription sub = subscriptionIter.getNode().getSubscription(); + QueueConsumer sub = consumerNodeIterator.getNode().getConsumer(); sub.getSendLock(); try @@ -1742,8 +1742,8 @@ public class SimpleAMQQueue implements AMQQueue, for(int i = 0 ; i < perSub; i++) { //attempt delivery. returns true if no further delivery currently possible to this sub - subscriptionDone = attemptDelivery(sub, true); - if (subscriptionDone) + consumerDone = attemptDelivery(sub, true); + if (consumerDone) { sub.flushBatched(); if (lastLoop && !sub.isSuspended()) @@ -1754,9 +1754,9 @@ public class SimpleAMQQueue implements AMQQueue, } else { - //this subscription can accept additional deliveries, so we must + //this consumer can accept additional deliveries, so we must //keep going after this (if iteration slicing allows it) - allSubscriptionsDone = false; + allConsumersDone = false; lastLoop = false; if(--iterations == 0) { @@ -1775,24 +1775,24 @@ public class SimpleAMQQueue implements AMQQueue, } } - if(allSubscriptionsDone && lastLoop) + if(allConsumersDone && lastLoop) { //We have done an extra loop already and there are again //again no further delivery attempts possible, only //keep going if state change demands it. deliveryIncomplete = false; } - else if(allSubscriptionsDone) + else if(allConsumersDone) { - //All subscriptions reported being done, but we have to do + //All consumers reported being done, but we have to do //an extra loop if the iterations are not exhausted and //there is still any work to be done - deliveryIncomplete = _subscriptionList.size() != 0; + deliveryIncomplete = _consumerList.size() != 0; lastLoop = true; } else { - //some subscriptions can still accept more messages, + //some consumers can still accept more messages, //keep going if iteration count allows. lastLoop = false; deliveryIncomplete = true; @@ -1970,9 +1970,9 @@ public class SimpleAMQQueue implements AMQQueue, private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State> { - private final QueueSubscription _sub; + private final QueueConsumer _sub; - public QueueEntryListener(final QueueSubscription sub) + public QueueEntryListener(final QueueConsumer sub) { _sub = sub; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index 6c63b30273..44bda5182a 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -30,7 +30,7 @@ import java.util.UUID; public class SortedQueue extends OutOfOrderQueue { //Lock object to synchronize enqueue. Used instead of the object - //monitor to prevent lock order issues with subscription sendLocks + //monitor to prevent lock order issues with consumer sendLocks //and consumer updates in the super classes private final Object _sortedQueueLock = new Object(); private final String _sortedPropertyName; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java index 54d63afe6f..c30a48b03a 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java @@ -25,7 +25,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.transport.TransportException; import java.util.concurrent.Executor; @@ -38,7 +37,7 @@ class SubFlushRunner implements Runnable private static final Logger _logger = Logger.getLogger(SubFlushRunner.class); - private final QueueSubscription _sub; + private final QueueConsumer _sub; private static int IDLE = 0; private static int SCHEDULED = 1; @@ -51,7 +50,7 @@ class SubFlushRunner implements Runnable private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES; private final AtomicBoolean _stateChange = new AtomicBoolean(); - public SubFlushRunner(QueueSubscription sub) + public SubFlushRunner(QueueConsumer sub) { _sub = sub; } @@ -65,7 +64,7 @@ class SubFlushRunner implements Runnable try { CurrentActor.set(_sub.getLogActor()); - complete = getQueue().flushSubscription(_sub, ITERATIONS); + complete = getQueue().flushConsumer(_sub, ITERATIONS); } catch (AMQException e) { diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 2ee72b9a36..a9b99503ec 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -19,7 +19,7 @@ * */ -package org.apache.qpid.server.subscription; +package org.apache.qpid.server.consumer; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -35,7 +35,6 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.StateChangeListener; @@ -46,29 +45,26 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -public class MockSubscription implements SubscriptionTarget +public class MockConsumer implements ConsumerTarget { private final List<String> _messageIds; private boolean _closed = false; private String tag = "mocktag"; private AMQQueue queue = null; - private StateChangeListener<SubscriptionTarget, State> _listener = null; + private StateChangeListener<ConsumerTarget, State> _listener = null; private State _state = State.ACTIVE; private ArrayList<MessageInstance> messages = new ArrayList<MessageInstance>(); private final Lock _stateChangeLock = new ReentrantLock(); - private static final AtomicLong idGenerator = new AtomicLong(0); - // Create a simple ID that increments for ever new Subscription private boolean _isActive = true; - private Subscription _subscription; - public MockSubscription() + public MockConsumer() { _messageIds = null; } - public MockSubscription(List<String> messageIds) + public MockConsumer(List<String> messageIds) { _messageIds = messageIds; } @@ -177,13 +173,12 @@ public class MockSubscription implements SubscriptionTarget } @Override - public void subscriptionRegistered(final Subscription sub) + public void consumerAdded(final Consumer sub) { - _subscription = sub; } @Override - public void subscriptionRemoved(final Subscription sub) + public void consumerRemoved(final Consumer sub) { } @@ -199,7 +194,7 @@ public class MockSubscription implements SubscriptionTarget } @Override - public void setStateListener(final StateChangeListener<SubscriptionTarget, State> listener) + public void setStateListener(final StateChangeListener<ConsumerTarget, State> listener) { _listener = listener; } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java index b2bc351f8f..52a53e8d38 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java @@ -25,7 +25,7 @@ import java.util.List; /** * Test SUB Log Messages */ -public class SubscriptionMessagesTest extends AbstractTestMessages +public class ConsumerMessagesTest extends AbstractTestMessages { public void testSubscriptionCreateALL() { diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java index 17d68ef7c3..c1068b4a0b 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java @@ -38,7 +38,7 @@ public class VirtualHostMessagesTest extends AbstractTestMessages validateLogMessage(log, "VHT-1001", expected); } - public void testSubscriptionClosed() + public void testVirtualhostClosed() { _logMessage = VirtualHostMessages.CLOSED(); List<Object> log = performLog(); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 0d38b7002a..7092b50c52 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -32,7 +32,7 @@ import java.util.ArrayList; import java.util.EnumSet; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import static org.mockito.Mockito.when; @@ -66,10 +66,10 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest queue.enqueue(createMessage(9L, (byte) 0)); // Register subscriber - queue.registerSubscription(getSubscription(), null, null, "test", EnumSet.noneOf(Subscription.Option.class)); + queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(Consumer.Option.class)); Thread.sleep(150); - ArrayList<MessageInstance> msgs = getSubscription().getMessages(); + ArrayList<MessageInstance> msgs = getConsumer().getMessages(); try { assertEquals(1L, msgs.get(0).getMessage().getMessageNumber()); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java new file mode 100644 index 0000000000..35508bb2c4 --- /dev/null +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java @@ -0,0 +1,445 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + + +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.test.utils.QpidTestCase; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ConsumerListTest extends QpidTestCase +{ + private QueueConsumerList _subList; + private QueueConsumer _sub1; + private QueueConsumer _sub2; + private QueueConsumer _sub3; + private QueueConsumerList.ConsumerNode _node; + + protected void setUp() + { + _subList = new QueueConsumerList(); + + _sub1 = newMockConsumer(); + _sub2 = newMockConsumer(); + _sub3 = newMockConsumer(); + + _subList.add(_sub1); + _subList.add(_sub2); + _subList.add(_sub3); + + _node = _subList.getHead(); + } + + + private QueueConsumer newMockConsumer() + { + ConsumerTarget target = mock(ConsumerTarget.class); + when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class)); + return new QueueConsumer(null,null,true,true,"sub",false,target); + } + + /** + * Test that if the first (non-head) node in the list is deleted (but is still present), + * it is not returned when searching through the list for the next viable node, and the + * subsequent viable node is returned instead. + */ + public void testFindNextSkipsFirstDeletedNode() + { + assertTrue("Deleting consumer node should have succeeded", + getNodeForConsumer(_subList, _sub1).delete()); + + assertNotNull("Returned node should not be null", _node = _node.findNext()); + assertEquals("Should have returned node for 2nd consumer", _sub2, _node.getConsumer()); + + assertNotNull("Returned node should not be null", _node = _node.findNext()); + assertEquals("Should have returned node for 3rd consumer", _sub3, _node.getConsumer()); + } + + /** + * Test that if a central node in the list is deleted (but is still present), + * it is not returned when searching through the list for the next viable node, + * and the subsequent viable node is returned instead. + */ + public void testFindNextSkipsCentralDeletedNode() + { + assertNotNull("Returned node should not be null", _node = _node.findNext()); + + assertTrue("Deleting consumer node should have succeeded", + getNodeForConsumer(_subList, _sub2).delete()); + + assertNotNull("Returned node should not be null", _node = _node.findNext()); + assertEquals("Should have returned node for 3rd consumer", _sub3, _node.getConsumer()); + } + + /** + * Test that if the last node in the list is deleted (but is still present), + * it is not returned when searching through the list for the next viable node, + * and null is returned instead. + */ + public void testFindNextSkipsLastDeletedNode() + { + assertNotNull("Returned node should not be null", _node = _node.findNext()); + assertEquals("Should have returned node for 1st consumer", _sub1, _node.getConsumer()); + + assertNotNull("Returned node should not be null", _node = _node.findNext()); + assertEquals("Should have returned node for 2nd consumer", _sub2, _node.getConsumer()); + + assertTrue("Deleting consumer node should have succeeded", + getNodeForConsumer(_subList, _sub3).delete()); + + assertNull("Returned node should be null", _node = _node.findNext()); + } + + /** + * Test that if multiple nodes in the list are deleted (but still present), they + * are not returned when searching through the list for the next viable node, + * and the subsequent viable node is returned instead. + */ + public void testFindNextSkipsMultipleDeletedNode() + { + assertTrue("Deleting consumer node should have succeeded", + getNodeForConsumer(_subList, _sub1).delete()); + assertTrue("Deleting consumer node should have succeeded", + getNodeForConsumer(_subList, _sub2).delete()); + + assertNotNull("Returned node should not be null", _node = _node.findNext()); + assertEquals("Should have returned node for 3rd consumer", _sub3, _node.getConsumer()); + } + + /** + * Test that if a node in the list is marked 'deleted' it is still present in the list + * until actually removed. counter-test to verify above testing of getNext() method. + */ + public void testDeletedNodeStillPresent() + { + assertTrue("Deleting consumer node should have succeeded", + getNodeForConsumer(_subList, _sub1).delete()); + + assertNotNull("Node marked deleted should still be present", getNodeForConsumer(_subList, _sub1)); + assertEquals("All 3 nodes are still expected to be present", 3, countNodes(_subList)); + } + + /** + * Traverses the list nodes in a non-mutating fashion, returning the first node which matches the given + * Consumer, or null if none is found. + */ + private QueueConsumerList.ConsumerNode getNodeForConsumer(final QueueConsumerList list, final Consumer sub) + { + QueueConsumerList.ConsumerNode node = list.getHead(); + while (node != null && node.getConsumer() != sub) + { + node = node.nextNode(); + } + + return node; + } + + /** + * Counts the number of (non-head) nodes in the list. + */ + private int countNodes(final QueueConsumerList list) + { + QueueConsumerList.ConsumerNode node = list.getHead(); + int count; + for(count = -1; node != null; count++) + { + node = node.nextNode(); + } + + return count; + } + + /** + * Tests that the head is returned as expected, and isn't the node for the first consumer. + */ + public void testGetHead() + { + assertNotNull("List head should be non null", _node); + assertNotSame("Head should not be node for first consumer", + _node, getNodeForConsumer(_subList, _sub1)); + } + + /** + * Tests that the size is returned correctly in the face of additions and removals. + */ + public void testGetSize() + { + QueueConsumerList subList = new QueueConsumerList(); + + assertEquals("Unexpected size result", 0, subList.size()); + + QueueConsumer sub1 = newMockConsumer(); + QueueConsumer sub2 = newMockConsumer(); + QueueConsumer sub3 = newMockConsumer(); + + subList.add(sub1); + assertEquals("Unexpected size result", 1, subList.size()); + + subList.add(sub2); + assertEquals("Unexpected size result", 2, subList.size()); + + subList.add(sub3); + assertEquals("Unexpected size result", 3, subList.size()); + + assertTrue("Removing consumer from list should have succeeded", subList.remove(sub1)); + assertEquals("Unexpected size result", 2, subList.size()); + + assertTrue("Removing consumer from list should have succeeded", subList.remove(sub2)); + assertEquals("Unexpected size result", 1, subList.size()); + + assertTrue("Removing consumer from list should have succeeded", subList.remove(sub3)); + assertEquals("Unexpected size result", 0, subList.size()); + } + + /** + * Test that if the first (non-head) node in the list is removed it is no longer + * present in the node structure of the list at all. + */ + public void testRemoveFirstNode() + { + assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub1)); + assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub1)); + assertNull("Should not have been a node present for the removed consumer", + getNodeForConsumer(_subList, _sub1)); + assertEquals("Unexpected number of nodes", 2, countNodes(_subList)); + assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub2)); + assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub3)); + } + + /** + * Test that if a central node in the list is removed it is no longer + * present in the node structure of the list at all. + */ + public void testRemoveCentralNode() + { + assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub2)); + assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub2)); + assertNull("Should not have been a node present for the removed consumer", + getNodeForConsumer(_subList, _sub2)); + assertEquals("Unexpected number of nodes", 2, countNodes(_subList)); + assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub1)); + assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub3)); + } + + /** + * Test that if the consumer contained in the last node of the list is removed + * it is no longer present in the node structure of the list at all. However, + * as the last node in the structure can't actually be removed a dummy will instead + * be present. + */ + public void testRemoveLastNode() + { + assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub3)); + assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub3)); + assertNull("Should not have been a node present for the removed consumer", + getNodeForConsumer(_subList, _sub3)); + + //We actually expect 3 nodes to remain this time, because the last node cant be removed for thread safety reasons, + //however a dummy final node can be used as substitute to allow removal of the consumer node. + assertEquals("Unexpected number of nodes", 2 + 1, countNodes(_subList)); + assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub1)); + assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub2)); + } + + /** + * Test that if the consumer not contained in the list is requested to be removed + * that the removal fails + */ + public void testRemoveNonexistentNode() + { + QueueConsumer sub4 = newMockConsumer(); + assertNull("Should not have been a node present for the consumer", getNodeForConsumer(_subList, sub4)); + assertFalse("Removing consumer node should not have succeeded", _subList.remove(sub4)); + assertEquals("Unexpected number of nodes", 3, countNodes(_subList)); + } + + /** + * Test that if a consumer node which occurs later in the main list than the marked node is + * removed from the list after the marked node is also removed, then the marker node doesn't + * serve to retain the subsequent nodes in the list structure (and thus memory) despite their + * removal. + */ + public void testDeletedMarkedNodeDoesntLeakSubsequentlyDeletedNodes() + { + //get the nodes out the list for the 1st and 3rd consumers + QueueConsumerList.ConsumerNode sub1Node = getNodeForConsumer(_subList, _sub1); + assertNotNull("Should have been a node present for the consumer", sub1Node); + QueueConsumerList.ConsumerNode sub3Node = getNodeForConsumer(_subList, _sub3); + assertNotNull("Should have been a node present for the consumer", sub3Node); + + //mark the first consumer node + assertTrue("should have succeeded in updating the marked node", + _subList.updateMarkedNode(_subList.getMarkedNode(), sub1Node)); + + //remove the 1st consumer from the list + assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub1)); + //verify the 1st consumer is no longer the marker node (replaced by a dummy), or in the main list structure + assertNotSame("Unexpected marker node", sub1Node, _subList.getMarkedNode()); + assertNull("Should not have been a node present in the list structure for the marked-but-removed sub1 node", + getNodeForConsumer(_subList, _sub1)); + + //remove the 2nd consumer from the list + assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub2)); + + //verify the marker node isn't leaking subsequently removed nodes, by ensuring the very next node + //in its list structure is now the 3rd consumer (since the 2nd was removed too) + assertEquals("Unexpected next node", sub3Node, _subList.getMarkedNode().nextNode()); + + //remove the 3rd and final/tail consumer + assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub3)); + + //verify the marker node isn't leaking subsequently removed nodes, by ensuring the very next node + //in its list structure is now the dummy tail (since the 3rd consumer was removed, and a dummy + //tail was inserted) and NOT the 3rd sub node. + assertNotSame("Unexpected next node", sub3Node, _subList.getMarkedNode().nextNode()); + assertTrue("Unexpected next node", _subList.getMarkedNode().nextNode().isDeleted()); + assertNull("Next non-deleted node from the marker should now be the list end, i.e. null", _subList.getMarkedNode().findNext()); + } + + /** + * Test that the marked node 'findNext' behaviour is as expected after a consumer is added + * to the list following the tail consumer node being removed while it is the marked node. + * That is, that the new consumers node is returned by getMarkedNode().findNext(). + */ + public void testMarkedNodeFindsNewConsumerAfterRemovingTailWhilstMarked() + { + //get the node out the list for the 3rd consumer + QueueConsumerList.ConsumerNode sub3Node = getNodeForConsumer(_subList, _sub3); + assertNotNull("Should have been a node present for the consumer", sub3Node); + + //mark the 3rd consumer node + assertTrue("should have succeeded in updating the marked node", + _subList.updateMarkedNode(_subList.getMarkedNode(), sub3Node)); + + //verify calling findNext on the marked node returns null, i.e. the end of the list has been reached + assertEquals("Unexpected node after marked node", null, _subList.getMarkedNode().findNext()); + + //remove the 3rd(marked) consumer from the list + assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub3)); + + //add a new 4th consumer to the list + QueueConsumer sub4 = newMockConsumer(); + _subList.add(sub4); + + //get the node out the list for the 4th consumer + QueueConsumerList.ConsumerNode sub4Node = getNodeForConsumer(_subList, sub4); + assertNotNull("Should have been a node present for the consumer", sub4Node); + + //verify the marked node (which is now a dummy substitute for the 3rd consumer) returns + //the 4th consumers node as the next non-deleted node. + assertEquals("Unexpected next node", sub4Node, _subList.getMarkedNode().findNext()); + } + + /** + * Test that setting the marked node to null doesn't cause problems during remove operations + */ + public void testRemoveWithNullMarkedNode() + { + //set the marker to null + assertTrue("should have succeeded in updating the marked node", + _subList.updateMarkedNode(_subList.getMarkedNode(), null)); + + //remove the 1st consumer from the main list + assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub1)); + + //verify the 1st consumer is no longer in the main list structure + assertNull("Should not have been a node present in the main list structure for sub1", + getNodeForConsumer(_subList, _sub1)); + assertEquals("Unexpected number of nodes", 2, countNodes(_subList)); + } + + /** + * Tests that after the first (non-head) node of the list is marked deleted but has not + * yet been removed, the iterator still skips it. + */ + public void testIteratorSkipsFirstDeletedNode() + { + //'delete' but don't remove the node for the 1st consumer + assertTrue("Deleting consumer node should have succeeded", + getNodeForConsumer(_subList, _sub1).delete()); + assertNotNull("Should still have been a node present for the deleted consumer", + getNodeForConsumer(_subList, _sub1)); + + QueueConsumerList.ConsumerNodeIterator iter = _subList.iterator(); + + //verify the iterator returns the 2nd consumers node + assertTrue("Iterator should have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected ConsumerNode", _sub2, iter.getNode().getConsumer()); + + //verify the iterator returns the 3rd consumers node and not the 2nd. + assertTrue("Iterator should have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected ConsumerNode", _sub3, iter.getNode().getConsumer()); + } + + /** + * Tests that after a central node of the list is marked deleted but has not yet been removed, + * the iterator still skips it. + */ + public void testIteratorSkipsCentralDeletedNode() + { + //'delete' but don't remove the node for the 2nd consumer + assertTrue("Deleting consumer node should have succeeded", + getNodeForConsumer(_subList, _sub2).delete()); + assertNotNull("Should still have been a node present for the deleted consumer", + getNodeForConsumer(_subList, _sub2)); + + QueueConsumerList.ConsumerNodeIterator iter = _subList.iterator(); + + //verify the iterator returns the 1st consumers node + assertTrue("Iterator should have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected ConsumerNode", _sub1, iter.getNode().getConsumer()); + + //verify the iterator returns the 3rd consumers node and not the 2nd. + assertTrue("Iterator should have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected ConsumerNode", _sub3, iter.getNode().getConsumer()); + } + + /** + * Tests that after the last node of the list is marked deleted but has not yet been removed, + * the iterator still skips it. + */ + public void testIteratorSkipsDeletedFinalNode() + { + //'delete' but don't remove the node for the 3rd consumer + assertTrue("Deleting consumer node should have succeeded", + getNodeForConsumer(_subList, _sub3).delete()); + assertNotNull("Should still have been a node present for the deleted 3rd consumer", + getNodeForConsumer(_subList, _sub3)); + + QueueConsumerList.ConsumerNodeIterator iter = _subList.iterator(); + + //verify the iterator returns the 1st consumers node + assertTrue("Iterator should have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected ConsumerNode", _sub1, iter.getNode().getConsumer()); + + //verify the iterator returns the 2nd consumers node + assertTrue("Iterator should have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected ConsumerNode", _sub2, iter.getNode().getConsumer()); + + //verify the iterator can no longer advance and does not return a consumer node + assertFalse("Iterator should not have been able to advance", iter.advance()); + assertEquals("Iterator returned unexpected ConsumerNode", null, iter.getNode()); + } +} diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 623118de02..36fd92004a 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -29,8 +29,8 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionTarget; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -207,15 +207,15 @@ public class MockAMQQueue implements AMQQueue } @Override - public Subscription registerSubscription(final SubscriptionTarget target, - final FilterManager filters, - final Class<? extends ServerMessage> messageClass, - final String consumerName, - final EnumSet<Subscription.Option> options) throws AMQException + public Consumer addConsumer(final ConsumerTarget target, + final FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, + final EnumSet<Consumer.Option> options) throws AMQException { - return new QueueSubscription(filters, messageClass, options.contains(Subscription.Option.ACQUIRES), - options.contains(Subscription.Option.SEES_REQUEUES), consumerName, - options.contains(Subscription.Option.TRANSIENT), target ); + return new QueueConsumer(filters, messageClass, options.contains(Consumer.Option.ACQUIRES), + options.contains(Consumer.Option.SEES_REQUEUES), consumerName, + options.contains(Consumer.Option.TRANSIENT), target ); } public String getName() @@ -224,22 +224,18 @@ public class MockAMQQueue implements AMQQueue } - public void unregisterSubscription(Subscription subscription) throws AMQException - { - - } - public Collection<Subscription> getConsumers() + public Collection<Consumer> getConsumers() { return Collections.emptyList(); } - public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener) + public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener) { } - public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener) + public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener) { } @@ -254,7 +250,7 @@ public class MockAMQQueue implements AMQQueue return 0; } - public boolean hasExclusiveSubscriber() + public boolean hasExclusiveConsumer() { return false; } @@ -318,11 +314,11 @@ public class MockAMQQueue implements AMQQueue { } - public void dequeue(QueueEntry entry, Subscription sub) + public void dequeue(QueueEntry entry, Consumer sub) { } - public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException + public boolean resend(QueueEntry entry, Consumer consumer) throws AMQException { return false; } @@ -431,12 +427,12 @@ public class MockAMQQueue implements AMQQueue return null; } - public void flushSubscription(Subscription sub) throws AMQException + public void flushConsumer(Consumer sub) throws AMQException { } - public void deliverAsync(Subscription sub) + public void deliverAsync(Consumer sub) { } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index e74bc992a2..b3a124a6bd 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -26,7 +26,7 @@ import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; @@ -41,7 +41,7 @@ public class MockQueueEntry implements QueueEntry return false; } - public boolean acquire(Subscription sub) + public boolean acquire(Consumer sub) { return false; } @@ -52,12 +52,12 @@ public class MockQueueEntry implements QueueEntry return 0; } - public boolean acquiredBySubscription() + public boolean acquiredByConsumer() { return false; } - public boolean isAcquiredBy(Subscription subscription) + public boolean isAcquiredBy(Consumer consumer) { return false; } @@ -87,7 +87,7 @@ public class MockQueueEntry implements QueueEntry return false; } - public Subscription getDeliveredSubscription() + public Consumer getDeliveredConsumer() { return null; } @@ -125,7 +125,7 @@ public class MockQueueEntry implements QueueEntry } - public boolean isRejectedBy(Subscription subscription) + public boolean isRejectedBy(Consumer consumer) { return false; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index c2099ec186..0b2c6c900c 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -21,10 +21,11 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageInstance.EntryState; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import java.lang.reflect.Field; @@ -112,17 +113,17 @@ public abstract class QueueEntryImplTestBase extends TestCase */ private void acquire() { - _queueEntry.acquire(newMockSubscription()); + _queueEntry.acquire(newMockConsumer()); assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method", _queueEntry.isAcquired()); } - private Subscription newMockSubscription() + private Consumer newMockConsumer() { - final Subscription subscription = mock(Subscription.class); - when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription)); - when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement()); - return subscription; + final Consumer consumer = mock(Consumer.class); + when(consumer.getOwningState()).thenReturn(new MessageInstance.ConsumerAcquiredState(consumer)); + when(consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement()); + return consumer; } /** @@ -147,34 +148,34 @@ public abstract class QueueEntryImplTestBase extends TestCase } /** - * Tests rejecting a queue entry records the Subscription ID - * for later verification by isRejectedBy(subscriptionId). + * Tests rejecting a queue entry records the Consumer ID + * for later verification by isRejectedBy(consumerId). */ public void testRejectAndRejectedBy() { - Subscription sub = newMockSubscription(); + Consumer sub = newMockConsumer(); - assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub)); - assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired()); + assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub)); + assertFalse("Queue entry should not yet have been acquired by a consumer", _queueEntry.isAcquired()); - //acquire, reject, and release the message using the subscription + //acquire, reject, and release the message using the consumer assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub)); _queueEntry.reject(); _queueEntry.release(); //verify the rejection is recorded - assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub)); + assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub)); - //repeat rejection using a second subscription - Subscription sub2 = newMockSubscription(); + //repeat rejection using a second consumer + Consumer sub2 = newMockConsumer(); - assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2)); + assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub2)); assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2)); _queueEntry.reject(); - //verify it still records being rejected by both subscriptions - assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub)); - assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2)); + //verify it still records being rejected by both consumers + assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub)); + assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub2)); } /** diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 51ae822b2e..542f6ba0d1 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -45,9 +45,9 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; -import org.apache.qpid.server.subscription.MockSubscription; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionTarget; +import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.consumer.MockConsumer; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -69,8 +69,8 @@ public class SimpleAMQQueueTest extends QpidTestCase private String _owner = "owner"; private String _routingKey = "routing key"; private DirectExchange _exchange; - private MockSubscription _subscriptionTarget = new MockSubscription(); - private QueueSubscription _subscription; + private MockConsumer _consumerTarget = new MockConsumer(); + private QueueConsumer _consumer; private Map<String,Object> _arguments = null; @Override @@ -162,13 +162,13 @@ public class SimpleAMQQueueTest extends QpidTestCase } - public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException + public void testRegisterConsumerThenEnqueueMessage() throws AMQException { ServerMessage messageA = createMessage(new Long(24)); - // Check adding a subscription adds it to the queue - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + // Check adding a consumer adds it to the queue + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); assertEquals("Queue does not have active consumer", 1, @@ -183,49 +183,49 @@ public class SimpleAMQQueueTest extends QpidTestCase catch(InterruptedException e) { } - assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull(_subscription.getQueueContext().getReleasedEntry()); + assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); + assertNull(_consumer.getQueueContext().getReleasedEntry()); - // Check removing the subscription removes it's information from the queue - _subscription.close(); - assertTrue("Subscription still had queue", _subscriptionTarget.isClosed()); + // Check removing the consumer removes it's information from the queue + _consumer.close(); + assertTrue("Consumer still had queue", _consumerTarget.isClosed()); assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount()); assertFalse("Queue still has active consumer", 1 == _queue.getActiveConsumerCount()); ServerMessage messageB = createMessage(new Long (25)); _queue.enqueue(messageB); - assertNull(_subscription.getQueueContext()); + assertNull(_consumer.getQueueContext()); } - public void testEnqueueMessageThenRegisterSubscription() throws AMQException, InterruptedException + public void testEnqueueMessageThenRegisterConsumer() throws AMQException, InterruptedException { ServerMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); Thread.sleep(150); - assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); + assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after an enqueue", - _subscription.getQueueContext().getReleasedEntry()); + _consumer.getQueueContext().getReleasedEntry()); } /** * Tests enqueuing two messages. */ - public void testEnqueueTwoMessagesThenRegisterSubscription() throws Exception + public void testEnqueueTwoMessagesThenRegisterConsumer() throws Exception { ServerMessage messageA = createMessage(new Long(24)); ServerMessage messageB = createMessage(new Long(25)); _queue.enqueue(messageA); _queue.enqueue(messageB); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); Thread.sleep(150); - assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage()); + assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after enqueues", - _subscription.getQueueContext().getReleasedEntry()); + _consumer.getQueueContext().getReleasedEntry()); } /** @@ -240,9 +240,9 @@ public class SimpleAMQQueueTest extends QpidTestCase ServerMessage messageC = createMessage(new Long(26)); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, - Subscription.Option.SEES_REQUEUES)); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() @@ -261,7 +261,9 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 3, + _consumerTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); @@ -272,12 +274,14 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 4, _subscriptionTarget.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 4, + _consumerTarget.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered()); assertNull("releasedEntry should be cleared after requeue processed", - _subscription.getQueueContext().getReleasedEntry()); + _consumer.getQueueContext().getReleasedEntry()); } /** @@ -289,9 +293,9 @@ public class SimpleAMQQueueTest extends QpidTestCase { ServerMessage messageA = createMessage(new Long(24)); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() @@ -313,7 +317,9 @@ public class SimpleAMQQueueTest extends QpidTestCase int subFlushWaitTime = 150; Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 1, _subscriptionTarget.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 1, + _consumerTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */ @@ -323,10 +329,12 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); - assertEquals("Total number of messages sent should not have changed", 1, _subscriptionTarget.getMessages().size()); + assertEquals("Total number of messages sent should not have changed", + 1, + _consumerTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertNull("releasedEntry should be cleared after requeue processed", - _subscription.getQueueContext().getReleasedEntry()); + _consumer.getQueueContext().getReleasedEntry()); } @@ -343,9 +351,9 @@ public class SimpleAMQQueueTest extends QpidTestCase ServerMessage messageB = createMessage(new Long(25)); ServerMessage messageC = createMessage(new Long(26)); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, - Subscription.Option.SEES_REQUEUES)); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>() @@ -364,7 +372,9 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 3, + _consumerTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); @@ -376,35 +386,37 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to subscription", 5, _subscriptionTarget.getMessages().size()); + assertEquals("Unexpected total number of messages sent to consumer", + 5, + _consumerTarget.getMessages().size()); assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered()); assertNull("releasedEntry should be cleared after requeue processed", - _subscription.getQueueContext().getReleasedEntry()); + _consumer.getQueueContext().getReleasedEntry()); } /** - * Tests that a release requeues an entry for a queue with multiple subscriptions. Verifies that a + * Tests that a release requeues an entry for a queue with multiple consumers. Verifies that a * requeue resends a message to a <i>single</i> subscriber. */ - public void testReleaseForQueueWithMultipleSubscriptions() throws Exception + public void testReleaseForQueueWithMultipleConsumers() throws Exception { ServerMessage messageA = createMessage(new Long(24)); ServerMessage messageB = createMessage(new Long(25)); - MockSubscription target1 = new MockSubscription(); - MockSubscription target2 = new MockSubscription(); + MockConsumer target1 = new MockConsumer(); + MockConsumer target2 = new MockConsumer(); - QueueSubscription subscription1 = _queue.registerSubscription(target1, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, - Subscription.Option.SEES_REQUEUES)); + QueueConsumer consumer1 = _queue.addConsumer(target1, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); - QueueSubscription subscription2 = _queue.registerSubscription(target2, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, - Subscription.Option.SEES_REQUEUES)); + QueueConsumer consumer2 = _queue.addConsumer(target2, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); @@ -433,22 +445,22 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - assertEquals("Unexpected total number of messages sent to both subscriptions after release", + assertEquals("Unexpected total number of messages sent to both consumers after release", 3, target1.getMessages().size() + target2.getMessages().size()); assertNull("releasedEntry should be cleared after requeue processed", - subscription1.getQueueContext().getReleasedEntry()); + consumer1.getQueueContext().getReleasedEntry()); assertNull("releasedEntry should be cleared after requeue processed", - subscription2.getQueueContext().getReleasedEntry()); + consumer2.getQueueContext().getReleasedEntry()); } public void testExclusiveConsumer() throws AMQException { ServerMessage messageA = createMessage(new Long(24)); - // Check adding an exclusive subscription adds it to the queue + // Check adding an exclusive consumer adds it to the queue - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.EXCLUSIVE)); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.EXCLUSIVE)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); @@ -464,16 +476,16 @@ public class SimpleAMQQueueTest extends QpidTestCase catch (InterruptedException e) { } - assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); + assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); // Check we cannot add a second subscriber to the queue - MockSubscription subB = new MockSubscription(); + MockConsumer subB = new MockConsumer(); Exception ex = null; try { - _queue.registerSubscription(subB, null, messageA.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + _queue.addConsumer(subB, null, messageA.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); } catch (AMQException e) @@ -483,16 +495,16 @@ public class SimpleAMQQueueTest extends QpidTestCase assertNotNull(ex); // Check we cannot add an exclusive subscriber to a queue with an - // existing subscription - _subscription.close(); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + // existing consumer + _consumer.close(); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); try { - _subscription = _queue.registerSubscription(subB, null, messageA.getClass(), "test", - EnumSet.of(Subscription.Option.EXCLUSIVE)); + _consumer = _queue.addConsumer(subB, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.EXCLUSIVE)); } catch (AMQException e) @@ -509,12 +521,12 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.setDeleteOnNoConsumers(true); ServerMessage message = createMessage(new Long(25)); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); _queue.enqueue(message); - _subscription.close(); - assertTrue("Queue was not deleted when subscription was removed", + _consumer.close(); + assertTrue("Queue was not deleted when consumer was removed", _queue.isDeleted()); } @@ -523,13 +535,13 @@ public class SimpleAMQQueueTest extends QpidTestCase Long id = new Long(26); ServerMessage message = createMessage(id); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test", - EnumSet.noneOf(Subscription.Option.class)); + _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", + EnumSet.noneOf(Consumer.Option.class)); _queue.enqueue(message); - QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry(); + QueueEntry entry = _consumer.getQueueContext().getLastSeenEntry(); entry.setRedelivered(); - _subscription.resend(entry); + _consumer.resend(entry); } @@ -656,19 +668,19 @@ public class SimpleAMQQueueTest extends QpidTestCase /** * processQueue() is used when asynchronously delivering messages to - * subscriptions which could not be delivered immediately during the + * consumers which could not be delivered immediately during the * enqueue() operation. * * A defect within the method would mean that delivery of these messages may * not occur should the Runner stop before all messages have been processed. * Such a defect was discovered when Selectors were used such that one and - * only one subscription can/will accept any given messages, but multiple - * subscriptions are present, and one of the earlier subscriptions receives + * only one consumer can/will accept any given messages, but multiple + * consumers are present, and one of the earlier consumers receives * more messages than the others. * * This test is to validate that the processQueue() method is able to * correctly deliver all of the messages present for asynchronous delivery - * to subscriptions in such a scenario. + * to consumers in such a scenario. */ public void testProcessQueueWithUniqueSelectors() throws Exception { @@ -677,10 +689,10 @@ public class SimpleAMQQueueTest extends QpidTestCase false, false, _virtualHost, factory, null) { @Override - public void deliverAsync(QueueSubscription sub) + public void deliverAsync(QueueConsumer sub) { // do nothing, i.e prevent deliveries by the SubFlushRunner - // when registering the new subscriptions + // when registering the new consumers } }; @@ -696,28 +708,28 @@ public class SimpleAMQQueueTest extends QpidTestCase QueueEntry msg4 = list.add(createMessage(4L)); QueueEntry msg5 = list.add(createMessage(5L)); - // Create lists of the entries each subscription should be interested - // in.Bias over 50% of the messages to the first subscription so that - // the later subscriptions reject them and report being done before - // the first subscription as the processQueue method proceeds. + // Create lists of the entries each consumer should be interested + // in.Bias over 50% of the messages to the first consumer so that + // the later consumers reject them and report being done before + // the first consumer as the processQueue method proceeds. List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3); List<String> msgListSub2 = createEntriesList(msg4); List<String> msgListSub3 = createEntriesList(msg5); - MockSubscription sub1 = new MockSubscription(msgListSub1); - MockSubscription sub2 = new MockSubscription(msgListSub2); - MockSubscription sub3 = new MockSubscription(msgListSub3); + MockConsumer sub1 = new MockConsumer(msgListSub1); + MockConsumer sub2 = new MockConsumer(msgListSub2); + MockConsumer sub3 = new MockConsumer(msgListSub3); - // register the subscriptions - testQueue.registerSubscription(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES)); - testQueue.registerSubscription(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES)); - testQueue.registerSubscription(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES)); + // register the consumers + testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); //check that no messages have been delivered to the - //subscriptions during registration + //consumers during registration assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size()); assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size()); assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); @@ -904,7 +916,7 @@ public class SimpleAMQQueueTest extends QpidTestCase false, "testOwner", false, false, _virtualHost, null) { @Override - public void deliverAsync(QueueSubscription sub) + public void deliverAsync(QueueConsumer sub) { // do nothing } @@ -919,8 +931,8 @@ public class SimpleAMQQueueTest extends QpidTestCase // latch to wait for message receipt final CountDownLatch latch = new CountDownLatch(messageNumber -1); - // create a subscription - MockSubscription subscription = new MockSubscription() + // create a consumer + MockConsumer consumer = new MockConsumer() { /** * Send a message and decrement latch @@ -937,7 +949,11 @@ public class SimpleAMQQueueTest extends QpidTestCase try { // subscribe - testQueue.registerSubscription(subscription, null, entries.get(0).getMessage().getClass(), "test", EnumSet.noneOf(Subscription.Option.class)); + testQueue.addConsumer(consumer, + null, + entries.get(0).getMessage().getClass(), + "test", + EnumSet.noneOf(Consumer.Option.class)); // process queue testQueue.processQueue(new QueueRunner(testQueue) @@ -962,11 +978,11 @@ public class SimpleAMQQueueTest extends QpidTestCase Thread.currentThread().interrupt(); } List<MessageInstance> expected = Arrays.asList((MessageInstance)entries.get(0), entries.get(2), entries.get(3)); - verifyReceivedMessages(expected, subscription.getMessages()); + verifyReceivedMessages(expected, consumer.getMessages()); } /** - * Tests that entry in dequeued state are not enqueued and not delivered to subscription + * Tests that entry in dequeued state are not enqueued and not delivered to consumer */ public void testEnqueueDequeuedEntry() { @@ -1002,7 +1018,7 @@ public class SimpleAMQQueueTest extends QpidTestCase } @Override - public boolean acquire(Subscription sub) + public boolean acquire(Consumer sub) { if(message.getMessageNumber() % 2 == 0) { @@ -1018,24 +1034,28 @@ public class SimpleAMQQueueTest extends QpidTestCase }; } }, null); - // create a subscription - MockSubscription subscription = new MockSubscription(); + // create a consumer + MockConsumer consumer = new MockConsumer(); - // register subscription + // register consumer try { - queue.registerSubscription(subscription, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class)); + queue.addConsumer(consumer, + null, + createMessage(-1l).getClass(), + "test", + EnumSet.noneOf(Consumer.Option.class)); } catch (AMQException e) { - fail("Failure to register subscription:" + e.getMessage()); + fail("Failure to register consumer:" + e.getMessage()); } // put test messages into a queue putGivenNumberOfMessages(queue, 4); // assert received messages - List<MessageInstance> messages = subscription.getMessages(); + List<MessageInstance> messages = consumer.getMessages(); assertEquals("Only 2 messages should be returned", 2, messages.size()); assertEquals("ID of first message should be 1", 1l, (messages.get(0).getMessage()).getMessageNumber()); @@ -1048,52 +1068,60 @@ public class SimpleAMQQueueTest extends QpidTestCase final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false, "testOwner", false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null); - //verify adding an active subscription increases the count - final MockSubscription subscription1 = new MockSubscription(); - subscription1.setActive(true); - subscription1.setState(SubscriptionTarget.State.ACTIVE); + //verify adding an active consumer increases the count + final MockConsumer consumer1 = new MockConsumer(); + consumer1.setActive(true); + consumer1.setState(ConsumerTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - queue.registerSubscription(subscription1, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class)); + queue.addConsumer(consumer1, + null, + createMessage(-1l).getClass(), + "test", + EnumSet.noneOf(Consumer.Option.class)); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify adding an inactive subscription doesn't increase the count - final MockSubscription subscription2 = new MockSubscription(); - subscription2.setActive(false); - subscription2.setState(SubscriptionTarget.State.SUSPENDED); + //verify adding an inactive consumer doesn't increase the count + final MockConsumer consumer2 = new MockConsumer(); + consumer2.setActive(false); + consumer2.setState(ConsumerTarget.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - queue.registerSubscription(subscription2, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class)); + queue.addConsumer(consumer2, + null, + createMessage(-1l).getClass(), + "test", + EnumSet.noneOf(Consumer.Option.class)); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); //verify behaviour in face of expected state changes: - //verify a subscription going suspended->active increases the count - subscription2.setState(SubscriptionTarget.State.ACTIVE); + //verify a consumer going suspended->active increases the count + consumer2.setState(ConsumerTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount()); - //verify a subscription going active->suspended decreases the count - subscription2.setState(SubscriptionTarget.State.SUSPENDED); + //verify a consumer going active->suspended decreases the count + consumer2.setState(ConsumerTarget.State.SUSPENDED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify a subscription going suspended->closed doesn't change the count - subscription2.setState(SubscriptionTarget.State.CLOSED); + //verify a consumer going suspended->closed doesn't change the count + consumer2.setState(ConsumerTarget.State.CLOSED); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify a subscription going active->active doesn't change the count - subscription1.setState(SubscriptionTarget.State.ACTIVE); + //verify a consumer going active->active doesn't change the count + consumer1.setState(ConsumerTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - subscription1.setState(SubscriptionTarget.State.SUSPENDED); + consumer1.setState(ConsumerTarget.State.SUSPENDED); assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - //verify a subscription going suspended->suspended doesn't change the count - subscription1.setState(SubscriptionTarget.State.SUSPENDED); + //verify a consumer going suspended->suspended doesn't change the count + consumer1.setState(ConsumerTarget.State.SUSPENDED); assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - subscription1.setState(SubscriptionTarget.State.ACTIVE); + consumer1.setState(ConsumerTarget.State.ACTIVE); assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - //verify a subscription going active->closed decreases the count - subscription1.setState(SubscriptionTarget.State.CLOSED); + //verify a consumer going active->closed decreases the count + consumer1.setState(ConsumerTarget.State.CLOSED); assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); } @@ -1248,9 +1276,9 @@ public class SimpleAMQQueueTest extends QpidTestCase return _queue; } - public MockSubscription getSubscription() + public MockConsumer getConsumer() { - return _subscriptionTarget; + return _consumerTarget; } public Map<String,Object> getArguments() diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SubscriptionListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SubscriptionListTest.java deleted file mode 100644 index 65ae53a3a7..0000000000 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SubscriptionListTest.java +++ /dev/null @@ -1,442 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - - -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionTarget; -import org.apache.qpid.test.utils.QpidTestCase; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class SubscriptionListTest extends QpidTestCase -{ - private SubscriptionList _subList; - private QueueSubscription _sub1; - private QueueSubscription _sub2; - private QueueSubscription _sub3; - private SubscriptionList.SubscriptionNode _node; - - protected void setUp() - { - _subList = new SubscriptionList(); - - _sub1 = newMockSubscription(); - _sub2 = newMockSubscription(); - _sub3 = newMockSubscription(); - - _subList.add(_sub1); - _subList.add(_sub2); - _subList.add(_sub3); - - _node = _subList.getHead(); - } - - - private QueueSubscription newMockSubscription() - { - SubscriptionTarget target = mock(SubscriptionTarget.class); - when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class)); - return new QueueSubscription(null,null,true,true,"sub",false,target); - } - - /** - * Test that if the first (non-head) node in the list is deleted (but is still present), - * it is not returned when searching through the list for the next viable node, and the - * subsequent viable node is returned instead. - */ - public void testFindNextSkipsFirstDeletedNode() - { - assertTrue("Deleting subscription node should have succeeded", - getNodeForSubscription(_subList, _sub1).delete()); - - assertNotNull("Returned node should not be null", _node = _node.findNext()); - assertEquals("Should have returned node for 2nd subscription", _sub2, _node.getSubscription()); - - assertNotNull("Returned node should not be null", _node = _node.findNext()); - assertEquals("Should have returned node for 3rd subscription", _sub3, _node.getSubscription()); - } - - /** - * Test that if a central node in the list is deleted (but is still present), - * it is not returned when searching through the list for the next viable node, - * and the subsequent viable node is returned instead. - */ - public void testFindNextSkipsCentralDeletedNode() - { - assertNotNull("Returned node should not be null", _node = _node.findNext()); - - assertTrue("Deleting subscription node should have succeeded", - getNodeForSubscription(_subList, _sub2).delete()); - - assertNotNull("Returned node should not be null", _node = _node.findNext()); - assertEquals("Should have returned node for 3rd subscription", _sub3, _node.getSubscription()); - } - - /** - * Test that if the last node in the list is deleted (but is still present), - * it is not returned when searching through the list for the next viable node, - * and null is returned instead. - */ - public void testFindNextSkipsLastDeletedNode() - { - assertNotNull("Returned node should not be null", _node = _node.findNext()); - assertEquals("Should have returned node for 1st subscription", _sub1, _node.getSubscription()); - - assertNotNull("Returned node should not be null", _node = _node.findNext()); - assertEquals("Should have returned node for 2nd subscription", _sub2, _node.getSubscription()); - - assertTrue("Deleting subscription node should have succeeded", - getNodeForSubscription(_subList, _sub3).delete()); - - assertNull("Returned node should be null", _node = _node.findNext()); - } - - /** - * Test that if multiple nodes in the list are deleted (but still present), they - * are not returned when searching through the list for the next viable node, - * and the subsequent viable node is returned instead. - */ - public void testFindNextSkipsMultipleDeletedNode() - { - assertTrue("Deleting subscription node should have succeeded", - getNodeForSubscription(_subList, _sub1).delete()); - assertTrue("Deleting subscription node should have succeeded", - getNodeForSubscription(_subList, _sub2).delete()); - - assertNotNull("Returned node should not be null", _node = _node.findNext()); - assertEquals("Should have returned node for 3rd subscription", _sub3, _node.getSubscription()); - } - - /** - * Test that if a node in the list is marked 'deleted' it is still present in the list - * until actually removed. counter-test to verify above testing of getNext() method. - */ - public void testDeletedNodeStillPresent() - { - assertTrue("Deleting subscription node should have succeeded", - getNodeForSubscription(_subList, _sub1).delete()); - - assertNotNull("Node marked deleted should still be present", getNodeForSubscription(_subList, _sub1)); - assertEquals("All 3 nodes are still expected to be present", 3, countNodes(_subList)); - } - - /** - * Traverses the list nodes in a non-mutating fashion, returning the first node which matches the given - * Subscription, or null if none is found. - */ - private SubscriptionList.SubscriptionNode getNodeForSubscription(final SubscriptionList list, final Subscription sub) - { - SubscriptionList.SubscriptionNode node = list.getHead(); - while (node != null && node.getSubscription() != sub) - { - node = node.nextNode(); - } - - return node; - } - - /** - * Counts the number of (non-head) nodes in the list. - */ - private int countNodes(final SubscriptionList list) - { - SubscriptionList.SubscriptionNode node = list.getHead(); - int count; - for(count = -1; node != null; count++) - { - node = node.nextNode(); - } - - return count; - } - - /** - * Tests that the head is returned as expected, and isn't the node for the first subscription. - */ - public void testGetHead() - { - assertNotNull("List head should be non null", _node); - assertNotSame("Head should not be node for first subscription", - _node, getNodeForSubscription(_subList, _sub1)); - } - - /** - * Tests that the size is returned correctly in the face of additions and removals. - */ - public void testGetSize() - { - SubscriptionList subList = new SubscriptionList(); - - assertEquals("Unexpected size result", 0, subList.size()); - - QueueSubscription sub1 = newMockSubscription(); - QueueSubscription sub2 = newMockSubscription(); - QueueSubscription sub3 = newMockSubscription(); - - subList.add(sub1); - assertEquals("Unexpected size result", 1, subList.size()); - - subList.add(sub2); - assertEquals("Unexpected size result", 2, subList.size()); - - subList.add(sub3); - assertEquals("Unexpected size result", 3, subList.size()); - - assertTrue("Removing subscription from list should have succeeded", subList.remove(sub1)); - assertEquals("Unexpected size result", 2, subList.size()); - - assertTrue("Removing subscription from list should have succeeded", subList.remove(sub2)); - assertEquals("Unexpected size result", 1, subList.size()); - - assertTrue("Removing subscription from list should have succeeded", subList.remove(sub3)); - assertEquals("Unexpected size result", 0, subList.size()); - } - - /** - * Test that if the first (non-head) node in the list is removed it is no longer - * present in the node structure of the list at all. - */ - public void testRemoveFirstNode() - { - assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub1)); - assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub1)); - assertNull("Should not have been a node present for the removed subscription", getNodeForSubscription(_subList, _sub1)); - assertEquals("Unexpected number of nodes", 2, countNodes(_subList)); - assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub2)); - assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub3)); - } - - /** - * Test that if a central node in the list is removed it is no longer - * present in the node structure of the list at all. - */ - public void testRemoveCentralNode() - { - assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub2)); - assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub2)); - assertNull("Should not have been a node present for the removed subscription", getNodeForSubscription(_subList, _sub2)); - assertEquals("Unexpected number of nodes", 2, countNodes(_subList)); - assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub1)); - assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub3)); - } - - /** - * Test that if the subscription contained in the last node of the list is removed - * it is no longer present in the node structure of the list at all. However, - * as the last node in the structure can't actually be removed a dummy will instead - * be present. - */ - public void testRemoveLastNode() - { - assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub3)); - assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3)); - assertNull("Should not have been a node present for the removed subscription", getNodeForSubscription(_subList, _sub3)); - - //We actually expect 3 nodes to remain this time, because the last node cant be removed for thread safety reasons, - //however a dummy final node can be used as substitute to allow removal of the subscription node. - assertEquals("Unexpected number of nodes", 2 + 1, countNodes(_subList)); - assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub1)); - assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub2)); - } - - /** - * Test that if the subscription not contained in the list is requested to be removed - * that the removal fails - */ - public void testRemoveNonexistentNode() - { - QueueSubscription sub4 = newMockSubscription(); - assertNull("Should not have been a node present for the subscription", getNodeForSubscription(_subList, sub4)); - assertFalse("Removing subscription node should not have succeeded", _subList.remove(sub4)); - assertEquals("Unexpected number of nodes", 3, countNodes(_subList)); - } - - /** - * Test that if a subscription node which occurs later in the main list than the marked node is - * removed from the list after the marked node is also removed, then the marker node doesn't - * serve to retain the subsequent nodes in the list structure (and thus memory) despite their - * removal. - */ - public void testDeletedMarkedNodeDoesntLeakSubsequentlyDeletedNodes() - { - //get the nodes out the list for the 1st and 3rd subscriptions - SubscriptionList.SubscriptionNode sub1Node = getNodeForSubscription(_subList, _sub1); - assertNotNull("Should have been a node present for the subscription", sub1Node); - SubscriptionList.SubscriptionNode sub3Node = getNodeForSubscription(_subList, _sub3); - assertNotNull("Should have been a node present for the subscription", sub3Node); - - //mark the first subscription node - assertTrue("should have succeeded in updating the marked node", - _subList.updateMarkedNode(_subList.getMarkedNode(), sub1Node)); - - //remove the 1st subscription from the list - assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub1)); - //verify the 1st subscription is no longer the marker node (replaced by a dummy), or in the main list structure - assertNotSame("Unexpected marker node", sub1Node, _subList.getMarkedNode()); - assertNull("Should not have been a node present in the list structure for the marked-but-removed sub1 node", - getNodeForSubscription(_subList, _sub1)); - - //remove the 2nd subscription from the list - assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub2)); - - //verify the marker node isn't leaking subsequently removed nodes, by ensuring the very next node - //in its list structure is now the 3rd subscription (since the 2nd was removed too) - assertEquals("Unexpected next node", sub3Node, _subList.getMarkedNode().nextNode()); - - //remove the 3rd and final/tail subscription - assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3)); - - //verify the marker node isn't leaking subsequently removed nodes, by ensuring the very next node - //in its list structure is now the dummy tail (since the 3rd subscription was removed, and a dummy - //tail was inserted) and NOT the 3rd sub node. - assertNotSame("Unexpected next node", sub3Node, _subList.getMarkedNode().nextNode()); - assertTrue("Unexpected next node", _subList.getMarkedNode().nextNode().isDeleted()); - assertNull("Next non-deleted node from the marker should now be the list end, i.e. null", _subList.getMarkedNode().findNext()); - } - - /** - * Test that the marked node 'findNext' behaviour is as expected after a subscription is added - * to the list following the tail subscription node being removed while it is the marked node. - * That is, that the new subscriptions node is returned by getMarkedNode().findNext(). - */ - public void testMarkedNodeFindsNewSubscriptionAfterRemovingTailWhilstMarked() - { - //get the node out the list for the 3rd subscription - SubscriptionList.SubscriptionNode sub3Node = getNodeForSubscription(_subList, _sub3); - assertNotNull("Should have been a node present for the subscription", sub3Node); - - //mark the 3rd subscription node - assertTrue("should have succeeded in updating the marked node", - _subList.updateMarkedNode(_subList.getMarkedNode(), sub3Node)); - - //verify calling findNext on the marked node returns null, i.e. the end of the list has been reached - assertEquals("Unexpected node after marked node", null, _subList.getMarkedNode().findNext()); - - //remove the 3rd(marked) subscription from the list - assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3)); - - //add a new 4th subscription to the list - QueueSubscription sub4 = newMockSubscription(); - _subList.add(sub4); - - //get the node out the list for the 4th subscription - SubscriptionList.SubscriptionNode sub4Node = getNodeForSubscription(_subList, sub4); - assertNotNull("Should have been a node present for the subscription", sub4Node); - - //verify the marked node (which is now a dummy substitute for the 3rd subscription) returns - //the 4th subscriptions node as the next non-deleted node. - assertEquals("Unexpected next node", sub4Node, _subList.getMarkedNode().findNext()); - } - - /** - * Test that setting the marked node to null doesn't cause problems during remove operations - */ - public void testRemoveWithNullMarkedNode() - { - //set the marker to null - assertTrue("should have succeeded in updating the marked node", - _subList.updateMarkedNode(_subList.getMarkedNode(), null)); - - //remove the 1st subscription from the main list - assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub1)); - - //verify the 1st subscription is no longer in the main list structure - assertNull("Should not have been a node present in the main list structure for sub1", - getNodeForSubscription(_subList, _sub1)); - assertEquals("Unexpected number of nodes", 2, countNodes(_subList)); - } - - /** - * Tests that after the first (non-head) node of the list is marked deleted but has not - * yet been removed, the iterator still skips it. - */ - public void testIteratorSkipsFirstDeletedNode() - { - //'delete' but don't remove the node for the 1st subscription - assertTrue("Deleting subscription node should have succeeded", - getNodeForSubscription(_subList, _sub1).delete()); - assertNotNull("Should still have been a node present for the deleted subscription", - getNodeForSubscription(_subList, _sub1)); - - SubscriptionList.SubscriptionNodeIterator iter = _subList.iterator(); - - //verify the iterator returns the 2nd subscriptions node - assertTrue("Iterator should have been able to advance", iter.advance()); - assertEquals("Iterator returned unexpected SubscriptionNode", _sub2, iter.getNode().getSubscription()); - - //verify the iterator returns the 3rd subscriptions node and not the 2nd. - assertTrue("Iterator should have been able to advance", iter.advance()); - assertEquals("Iterator returned unexpected SubscriptionNode", _sub3, iter.getNode().getSubscription()); - } - - /** - * Tests that after a central node of the list is marked deleted but has not yet been removed, - * the iterator still skips it. - */ - public void testIteratorSkipsCentralDeletedNode() - { - //'delete' but don't remove the node for the 2nd subscription - assertTrue("Deleting subscription node should have succeeded", - getNodeForSubscription(_subList, _sub2).delete()); - assertNotNull("Should still have been a node present for the deleted subscription", - getNodeForSubscription(_subList, _sub2)); - - SubscriptionList.SubscriptionNodeIterator iter = _subList.iterator(); - - //verify the iterator returns the 1st subscriptions node - assertTrue("Iterator should have been able to advance", iter.advance()); - assertEquals("Iterator returned unexpected SubscriptionNode", _sub1, iter.getNode().getSubscription()); - - //verify the iterator returns the 3rd subscriptions node and not the 2nd. - assertTrue("Iterator should have been able to advance", iter.advance()); - assertEquals("Iterator returned unexpected SubscriptionNode", _sub3, iter.getNode().getSubscription()); - } - - /** - * Tests that after the last node of the list is marked deleted but has not yet been removed, - * the iterator still skips it. - */ - public void testIteratorSkipsDeletedFinalNode() - { - //'delete' but don't remove the node for the 3rd subscription - assertTrue("Deleting subscription node should have succeeded", - getNodeForSubscription(_subList, _sub3).delete()); - assertNotNull("Should still have been a node present for the deleted 3rd subscription", - getNodeForSubscription(_subList, _sub3)); - - SubscriptionList.SubscriptionNodeIterator iter = _subList.iterator(); - - //verify the iterator returns the 1st subscriptions node - assertTrue("Iterator should have been able to advance", iter.advance()); - assertEquals("Iterator returned unexpected SubscriptionNode", _sub1, iter.getNode().getSubscription()); - - //verify the iterator returns the 2nd subscriptions node - assertTrue("Iterator should have been able to advance", iter.advance()); - assertEquals("Iterator returned unexpected SubscriptionNode", _sub2, iter.getNode().getSubscription()); - - //verify the iterator can no longer advance and does not return a subscription node - assertFalse("Iterator should not have been able to advance", iter.advance()); - assertEquals("Iterator returned unexpected SubscriptionNode", null, iter.getNode()); - } -} diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 45643e2f9d..2093490ee2 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.AMQException; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -34,8 +33,8 @@ import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.subscription.AbstractSubscriptionTarget; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -46,7 +45,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implements FlowCreditManager.FlowCreditManagerListener +public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener { private static final Option[] BATCHED = new Option[] { Option.BATCH }; @@ -69,16 +68,16 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen private final Map<String, Object> _arguments; private int _deferredMessageCredit; private long _deferredSizeCredit; - private Subscription _subscription; + private Consumer _consumer; - public SubscriptionTarget_0_10(ServerSession session, - String name, - MessageAcceptMode acceptMode, - MessageAcquireMode acquireMode, - MessageFlowMode flowMode, - FlowCreditManager_0_10 creditManager, - Map<String, Object> arguments) + public ConsumerTarget_0_10(ServerSession session, + String name, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + MessageFlowMode flowMode, + FlowCreditManager_0_10 creditManager, + Map<String, Object> arguments) { super(State.SUSPENDED); _session = session; @@ -93,9 +92,9 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen _name = name; } - public Subscription getSubscription() + public Consumer getConsumer() { - return _subscription; + return _consumer; } public boolean isSuspended() @@ -108,7 +107,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen boolean closed = false; State state = getState(); - getSubscription().getSendLock(); + getConsumer().getSendLock(); try { while(!closed && state != State.CLOSED) @@ -124,7 +123,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen } finally { - getSubscription().releaseSendLock(); + getConsumer().releaseSendLock(); } return closed; @@ -255,8 +254,8 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); - xfr = batch ? new MessageTransfer(getSubscription().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) - : new MessageTransfer(getSubscription().getName(),_acceptMode,_acquireMode,header,msg.getBody()); + xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) + : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody()); if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) { @@ -353,7 +352,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen { entry.setRedelivered(); entry.routeToAlternate(null, null); - if(entry.isAcquiredBy(getSubscription())) + if(entry.isAcquiredBy(getConsumer())) { entry.delete(); } @@ -450,7 +449,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen { try { - getSubscription().getSendLock(); + getConsumer().getSendLock(); updateState(State.ACTIVE, State.SUSPENDED); _stopped.set(true); @@ -459,7 +458,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen } finally { - getSubscription().releaseSendLock(); + getConsumer().releaseSendLock(); } } @@ -519,7 +518,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen public void acknowledge(MessageInstance entry) { // TODO Fix Store Context / cleanup - if(entry.isAcquiredBy(getSubscription())) + if(entry.isAcquiredBy(getConsumer())) { _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize()); _unacknowledgedCount.decrementAndGet(); @@ -530,7 +529,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen public void flush() throws AMQException { flushCreditState(true); - getSubscription().flush(); + getConsumer().flush(); stop(); } @@ -560,13 +559,13 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen @Override - public void subscriptionRegistered(final Subscription sub) + public void consumerAdded(final Consumer sub) { - _subscription = sub; + _consumer = sub; } @Override - public void subscriptionRemoved(final Subscription sub) + public void consumerRemoved(final Consumer sub) { } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java index b7788911c7..4420709a91 100755 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.queue.QueueEntry; class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener @@ -32,9 +31,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi private final MessageInstance _entry; - private final SubscriptionTarget_0_10 _target; + private final ConsumerTarget_0_10 _target; - public ExplicitAcceptDispositionChangeListener(MessageInstance entry, SubscriptionTarget_0_10 target) + public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target) { _entry = entry; _target = target; @@ -42,7 +41,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onAccept() { - if(_target != null && _entry.isAcquiredBy(_target.getSubscription())) + if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) { _target.getSessionModel().acknowledge(_target, _entry); } @@ -55,7 +54,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onRelease(boolean setRedelivered) { - if(_target != null && _entry.isAcquiredBy(_target.getSubscription())) + if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) { _target.release(_entry, setRedelivered); } @@ -67,7 +66,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onReject() { - if(_target != null && _entry.isAcquiredBy(_target.getSubscription())) + if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) { _target.reject(_entry); } @@ -80,7 +79,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public boolean acquire() { - return _entry.acquire(_target.getSubscription()); + return _entry.acquire(_target.getConsumer()); } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java index 0c238c4d55..c459364dbb 100755 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.queue.QueueEntry; class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener { @@ -31,9 +30,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi private final MessageInstance _entry; - private SubscriptionTarget_0_10 _target; + private ConsumerTarget_0_10 _target; - public ImplicitAcceptDispositionChangeListener(MessageInstance entry, SubscriptionTarget_0_10 target) + public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target) { _entry = entry; _target = target; @@ -46,7 +45,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onRelease(boolean setRedelivered) { - if(_entry.isAcquiredBy(_target.getSubscription())) + if(_entry.isAcquiredBy(_target.getConsumer())) { _target.release(_entry, setRedelivered); } @@ -58,7 +57,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onReject() { - if(_entry.isAcquiredBy(_target.getSubscription())) + if(_entry.isAcquiredBy(_target.getConsumer())) { _target.reject(_entry); } @@ -71,7 +70,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public boolean acquire() { - boolean acquired = _entry.acquire(_target.getSubscription()); + boolean acquired = _entry.acquire(_target.getConsumer()); if(acquired) { _target.recordUnacknowledged(_entry); diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java index 71ad60c7b8..cd1146ac0b 100755 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java @@ -22,17 +22,16 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.transport.Method; public class MessageAcceptCompletionListener implements Method.CompletionListener { - private final SubscriptionTarget_0_10 _sub; + private final ConsumerTarget_0_10 _sub; private final MessageInstance _entry; private final ServerSession _session; private boolean _restoreCredit; - public MessageAcceptCompletionListener(SubscriptionTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit) + public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit) { super(); _sub = sub; @@ -47,7 +46,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene { _sub.restoreCredit(_entry.getMessage()); } - if(_entry.isAcquiredBy(_sub.getSubscription())) + if(_entry.isAcquiredBy(_sub.getConsumer())) { _session.acknowledge(_sub, _entry); } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 6348510b09..c85a415ce5 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -282,8 +282,8 @@ public class ServerConnectionDelegate extends ServerDelegate private void stopAllSubscriptions(Connection conn, SessionDetach dtc) { final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel()); - final Collection<SubscriptionTarget_0_10> subs = ssn.getSubscriptions(); - for (SubscriptionTarget_0_10 subscription_0_10 : subs) + final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions(); + for (ConsumerTarget_0_10 subscription_0_10 : subs) { subscription_0_10.stop(); } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 73263bd931..67b3740bb6 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -63,7 +63,6 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; -import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.DistributedTransaction; @@ -137,7 +136,7 @@ public class ServerSession extends Session private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private Map<String, SubscriptionTarget_0_10> _subscriptions = new ConcurrentHashMap<String, SubscriptionTarget_0_10>(); + private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>(); private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>(); @@ -400,7 +399,7 @@ public class ServerSession extends Session // Broker shouldn't block awaiting close - thus do override this method to do nothing } - public void acknowledge(final SubscriptionTarget_0_10 sub, final MessageInstance entry) + public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry) { _transaction.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action() @@ -421,22 +420,22 @@ public class ServerSession extends Session }); } - public Collection<SubscriptionTarget_0_10> getSubscriptions() + public Collection<ConsumerTarget_0_10> getSubscriptions() { return _subscriptions.values(); } - public void register(String destination, SubscriptionTarget_0_10 sub) + public void register(String destination, ConsumerTarget_0_10 sub) { _subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub); } - public SubscriptionTarget_0_10 getSubscription(String destination) + public ConsumerTarget_0_10 getSubscription(String destination) { return _subscriptions.get(destination == null ? NULL_DESTINATION : destination); } - public void unregister(SubscriptionTarget_0_10 sub) + public void unregister(ConsumerTarget_0_10 sub) { _subscriptions.remove(sub.getName()); sub.close(); @@ -808,8 +807,8 @@ public class ServerSession extends Session void unregisterSubscriptions() { - final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions(); - for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions) + final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions(); + for (ConsumerTarget_0_10 subscription_0_10 : subscriptions) { unregister(subscription_0_10); } @@ -817,8 +816,8 @@ public class ServerSession extends Session void stopSubscriptions() { - final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions(); - for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions) + final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions(); + for (ConsumerTarget_0_10 subscription_0_10 : subscriptions) { subscription_0_10.stop(); } @@ -827,8 +826,8 @@ public class ServerSession extends Session public void receivedComplete() { - final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions(); - for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions) + final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions(); + for (ConsumerTarget_0_10 subscription_0_10 : subscriptions) { subscription_0_10.flushCreditState(false); } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 515c1e6f8f..d1b24a2fdb 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -46,7 +46,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.DtxNotSelectedException; import org.apache.qpid.server.txn.IncorrectDtxStateException; @@ -257,7 +257,7 @@ public class ServerSessionDelegate extends SessionDelegate return; } - SubscriptionTarget_0_10 target = new SubscriptionTarget_0_10((ServerSession)session, destination, + ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination, method.getAcceptMode(), method.getAcquireMode(), MessageFlowMode.WINDOW, @@ -268,31 +268,31 @@ public class ServerSessionDelegate extends SessionDelegate ((ServerSession)session).register(destination, target); try { - EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class); + EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class); if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED) { - options.add(Subscription.Option.ACQUIRES); + options.add(Consumer.Option.ACQUIRES); } if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT) { - options.add(Subscription.Option.SEES_REQUEUES); + options.add(Consumer.Option.SEES_REQUEUES); } if(method.getExclusive()) { - options.add(Subscription.Option.EXCLUSIVE); + options.add(Consumer.Option.EXCLUSIVE); } - Subscription sub = - queue.registerSubscription(target, - filterManager, - MessageTransferMessage.class, - destination, - options); + Consumer sub = + queue.addConsumer(target, + filterManager, + MessageTransferMessage.class, + destination, + options); } - catch (AMQQueue.ExistingExclusiveSubscription existing) + catch (AMQQueue.ExistingExclusiveConsumer existing) { exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer"); } - catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive) + catch (AMQQueue.ExistingConsumerPreventsExclusive exclusive) { exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively"); } @@ -405,7 +405,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = method.getDestination(); - SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -422,7 +422,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = method.getDestination(); - SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -1476,7 +1476,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = sfm.getDestination(); - SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -1493,7 +1493,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = stop.getDestination(); - SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -1511,7 +1511,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = flow.getDestination(); - SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index ce1a132973..d2b1f83513 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -69,8 +69,7 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionTarget; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; @@ -113,7 +112,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private IncomingMessage _currentMessage; /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ - private final Map<AMQShortString, SubscriptionTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, SubscriptionTarget_0_8>(); + private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>(); private final MessageStore _messageStore; @@ -488,10 +487,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - public Subscription getSubscription(AMQShortString tag) + public Consumer getSubscription(AMQShortString tag) { - final SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag); - return target == null ? null : target.getSubscription(); + final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag); + return target == null ? null : target.getConsumer(); } /** @@ -522,30 +521,30 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F throw new AMQException("Consumer already exists with same tag: " + tag); } - SubscriptionTarget_0_8 target; - EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class); + ConsumerTarget_0_8 target; + EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class); if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) { - target = SubscriptionTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager); - options.add(Subscription.Option.TRANSIENT); + target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager); + options.add(Consumer.Option.TRANSIENT); } else if(acks) { - target = SubscriptionTarget_0_8.createAckTarget(this, tag, filters, _creditManager); - options.add(Subscription.Option.ACQUIRES); - options.add(Subscription.Option.SEES_REQUEUES); + target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager); + options.add(Consumer.Option.ACQUIRES); + options.add(Consumer.Option.SEES_REQUEUES); } else { - target = SubscriptionTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); - options.add(Subscription.Option.ACQUIRES); - options.add(Subscription.Option.SEES_REQUEUES); + target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); + options.add(Consumer.Option.ACQUIRES); + options.add(Consumer.Option.SEES_REQUEUES); } if(exclusive) { - options.add(Subscription.Option.EXCLUSIVE); + options.add(Consumer.Option.EXCLUSIVE); } // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. @@ -557,8 +556,12 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F try { - Subscription sub = - queue.registerSubscription(target, FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), AMQMessage.class, AMQShortString.toString(tag), options); + Consumer sub = + queue.addConsumer(target, + FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), + AMQMessage.class, + AMQShortString.toString(tag), + options); } catch (AMQException e) { @@ -582,8 +585,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException { - SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag); - Subscription sub = target == null ? null : target.getSubscription(); + ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag); + Consumer sub = target == null ? null : target.getConsumer(); if (sub != null) { sub.close(); @@ -651,14 +654,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - for (Map.Entry<AMQShortString, SubscriptionTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet()) + for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet()) { if (_logger.isInfoEnabled()) { _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } - Subscription sub = me.getValue().getSubscription(); + Consumer sub = me.getValue().getConsumer(); sub.close(); @@ -674,14 +677,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * @param entry the record of the message on the queue that was delivered * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the * delivery tag) - * @param subscription The consumer that is to acknowledge this message. + * @param consumer The consumer that is to acknowledge this message. */ - public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Subscription subscription) + public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer) { if (_logger.isDebugEnabled()) { _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag - + ") for " + subscription); + + ") for " + consumer); } @@ -928,9 +931,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if (wasSuspended) { // may need to deliver queued messages - for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values()) + for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values()) { - s.getSubscription().externalStateChange(); + s.getConsumer().externalStateChange(); } } @@ -944,15 +947,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if (!wasSuspended) { // may need to deliver queued messages - for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values()) + for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values()) { try { - s.getSubscription().getSendLock(); + s.getConsumer().getSendLock(); } finally { - s.getSubscription().releaseSendLock(); + s.getConsumer().releaseSendLock(); } } } @@ -1029,10 +1032,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F boolean requiresSuspend = _suspended.compareAndSet(false,true); // ensure all subscriptions have seen the change to the channel state - for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) + for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) { - sub.getSubscription().getSendLock(); - sub.getSubscription().releaseSendLock(); + sub.getConsumer().getSendLock(); + sub.getConsumer().releaseSendLock(); } try @@ -1052,7 +1055,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F for(MessageInstance entry : _resendList) { - Subscription sub = entry.getDeliveredSubscription(); + Consumer sub = entry.getDeliveredConsumer(); if(sub == null || sub.isClosed()) { entry.release(); @@ -1067,9 +1070,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if(requiresSuspend) { _suspended.set(false); - for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) + for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) { - sub.getSubscription().externalStateChange(); + sub.getConsumer().externalStateChange(); } } @@ -1125,7 +1128,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() { - public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag) + public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag) { addUnacknowledgedMessage(entry, deliveryTag, sub); } @@ -1472,7 +1475,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F else { final ServerMessage msg = rejectedQueueEntry.getMessage(); - final Subscription sub = rejectedQueueEntry.getDeliveredSubscription(); + final Consumer sub = rejectedQueueEntry.getDeliveredConsumer(); int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>() { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 131b88fee4..e83e86981b 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -94,7 +94,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; @@ -1668,7 +1668,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } @Override - public void deliverToClient(final Subscription sub, final ServerMessage message, + public void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) throws AMQException { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java index 7f52aa9a08..2e362c11f8 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java @@ -23,10 +23,10 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; public interface ClientDeliveryMethod { - void deliverToClient(final Subscription sub, final ServerMessage message, final InstanceProperties props, + void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) throws AMQException; } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 9a6cec87bd..536bc18964 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -33,8 +33,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.subscription.AbstractSubscriptionTarget; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.StateChangeListener; @@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicLong; * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag * that was given out by the broker and the channel id. <p/> */ -public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget implements FlowCreditManager.FlowCreditManagerListener +public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener { private final StateChangeListener<QueueEntry, QueueEntry.State> _entryReleaseListener = @@ -70,23 +70,23 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget private final AtomicLong _unacknowledgedCount = new AtomicLong(0); private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); - private Subscription _subscription; + private Consumer _consumer; - public static SubscriptionTarget_0_8 createBrowserTarget(AMQChannel channel, + public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager) throws AMQException { - return new BrowserSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); + return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } - static final class BrowserSubscription extends SubscriptionTarget_0_8 + static final class BrowserConsumer extends ConsumerTarget_0_8 { - public BrowserSubscription(AMQChannel channel, - AMQShortString consumerTag, FieldTable filters, - FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) + public BrowserConsumer(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException { super(channel, consumerTag, @@ -124,31 +124,31 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget } - public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel, + public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager) throws AMQException { - return new NoAckSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); + return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } - public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel, + public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException { - return new NoAckSubscription(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); + return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } - public static class NoAckSubscription extends SubscriptionTarget_0_8 + public static class NoAckConsumer extends ConsumerTarget_0_8 { private final AutoCommitTransaction _txn; - public NoAckSubscription(AMQChannel channel, - AMQShortString consumerTag, FieldTable filters, - FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) + public NoAckConsumer(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException { super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); @@ -221,13 +221,13 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget /** * NoAck Subscription for use with BasicGet method. */ - public static final class GetNoAckSubscription extends SubscriptionTarget_0_8.NoAckSubscription + public static final class GetNoAckConsumer extends NoAckConsumer { - public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) + public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession, + AMQShortString consumerTag, FieldTable filters, + boolean noLocal, FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException { super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); @@ -241,32 +241,32 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget } - public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel, + public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager) throws AMQException { - return new AckSubscription(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); + return new AckConsumer(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } - public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel, + public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException { - return new AckSubscription(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod); + return new AckConsumer(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod); } - static final class AckSubscription extends SubscriptionTarget_0_8 + static final class AckConsumer extends ConsumerTarget_0_8 { - public AckSubscription(AMQChannel channel, - AMQShortString consumerTag, FieldTable filters, - FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) + public AckConsumer(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException { super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); @@ -305,7 +305,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget } - private static final Logger _logger = Logger.getLogger(SubscriptionTarget_0_8.class); + private static final Logger _logger = Logger.getLogger(ConsumerTarget_0_8.class); private final AMQChannel _channel; @@ -320,12 +320,12 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget - public SubscriptionTarget_0_8(AMQChannel channel, - AMQShortString consumerTag, - FieldTable arguments, - FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) + public ConsumerTarget_0_8(AMQChannel channel, + AMQShortString consumerTag, + FieldTable arguments, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException { super(State.ACTIVE); @@ -357,20 +357,20 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget } } - public Subscription getSubscription() + public Consumer getConsumer() { - return _subscription; + return _consumer; } @Override - public void subscriptionRemoved(final Subscription sub) + public void consumerRemoved(final Consumer sub) { } @Override - public void subscriptionRegistered(final Subscription sub) + public void consumerAdded(final Consumer sub) { - _subscription = sub; + _consumer = sub; } public AMQSessionModel getSessionModel() @@ -417,7 +417,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget boolean closed = false; State state = getState(); - getSubscription().getSendLock(); + getConsumer().getSendLock(); try { while(!closed && state != State.CLOSED) @@ -433,7 +433,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget } finally { - getSubscription().releaseSendLock(); + getConsumer().releaseSendLock(); } } @@ -488,14 +488,14 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) throws AMQException { - _deliveryMethod.deliverToClient(getSubscription(), message, props, deliveryTag); + _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag); } protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag) { - _recordMethod.recordMessageDelivery(getSubscription(),entry,deliveryTag); + _recordMethod.recordMessageDelivery(getConsumer(),entry,deliveryTag); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java index 06c1d79439..1de1638c2e 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java @@ -24,11 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.consumer.Consumer; import java.util.Map; @@ -53,11 +49,11 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor { message.setRedelivered(); - final Subscription subscription = message.getDeliveredSubscription(); - if (subscription != null) + final Consumer consumer = message.getDeliveredConsumer(); + if (consumer != null) { // Consumer exists - if (!subscription.isClosed()) + if (!consumer.isClosed()) { _msgToResend.put(deliveryTag, message); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java index 89a5aa55c1..70d7da3432 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java @@ -21,9 +21,9 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; public interface RecordDeliveryMethod { - void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag); + void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index 836de44f4e..c2d02c1df8 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -156,14 +156,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } - catch (AMQQueue.ExistingExclusiveSubscription e) + catch (AMQQueue.ExistingExclusiveConsumer e) { throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() + " as it already has an existing exclusive consumer"); } - catch (AMQQueue.ExistingSubscriptionPreventsExclusive e) + catch (AMQQueue.ExistingConsumerPreventsExclusive e) { throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 53c0ae7381..d4bd486a99 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -38,14 +38,13 @@ import org.apache.qpid.server.flow.MessageOnlyCreditManager; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.v0_8.SubscriptionTarget_0_8; +import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.EnumSet; @@ -133,7 +132,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB { @Override - public void deliverToClient(final Subscription sub, final ServerMessage message, final + public void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) throws AMQException { @@ -150,30 +149,30 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() { - public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag) + public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag) { channel.addUnacknowledgedMessage(entry, deliveryTag, null); } }; - SubscriptionTarget_0_8 target; - EnumSet<Subscription.Option> options = EnumSet.of(Subscription.Option.TRANSIENT, Subscription.Option.ACQUIRES, - Subscription.Option.SEES_REQUEUES); + ConsumerTarget_0_8 target; + EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES); if(acks) { - target = SubscriptionTarget_0_8.createAckTarget(channel, - AMQShortString.EMPTY_STRING, null, - singleMessageCredit, getDeliveryMethod, getRecordMethod); + target = ConsumerTarget_0_8.createAckTarget(channel, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); } else { - target = SubscriptionTarget_0_8.createNoAckTarget(channel, - AMQShortString.EMPTY_STRING, null, - singleMessageCredit, getDeliveryMethod, getRecordMethod); + target = ConsumerTarget_0_8.createNoAckTarget(channel, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); } - Subscription sub = queue.registerSubscription(target, null, AMQMessage.class, "", options); + Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options); sub.flush(); sub.close(); return(!singleMessageCredit.hasCredit()); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index d542c82a5b..d9352f34f7 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -30,10 +30,9 @@ import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.BrokerTestHelper; @@ -49,8 +48,8 @@ import java.util.Set; */ public class AckTest extends QpidTestCase { - private SubscriptionTarget_0_8 _subscriptionTarget; - private Subscription _subscription; + private ConsumerTarget_0_8 _subscriptionTarget; + private Consumer _consumer; private AMQProtocolSession _protocolSession; @@ -180,10 +179,13 @@ public class AckTest extends QpidTestCase */ public void testAckChannelAssociationTest() throws AMQException { - _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, new LimitlessCreditManager()); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); @@ -207,16 +209,16 @@ public class AckTest extends QpidTestCase public void testNoAckMode() throws AMQException { // false arg means no acks expected - _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel, - DEFAULT_CONSUMER_TAG, - null, - new LimitlessCreditManager()); - _subscription = _queue.registerSubscription(_subscriptionTarget, - null, - AMQMessage.class, - DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, + null, + AMQMessage.class, + DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); @@ -233,12 +235,12 @@ public class AckTest extends QpidTestCase { // false arg means no acks expected - _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel, - DEFAULT_CONSUMER_TAG, - null, - new LimitlessCreditManager()); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount, true); @@ -256,13 +258,13 @@ public class AckTest extends QpidTestCase public void testSingleAckReceivedTest() throws AMQException { - _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, - DEFAULT_CONSUMER_TAG, - null, - new LimitlessCreditManager()); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); @@ -293,13 +295,13 @@ public class AckTest extends QpidTestCase public void testMultiAckReceivedTest() throws AMQException { - _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, - DEFAULT_CONSUMER_TAG, - null, - new LimitlessCreditManager()); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); @@ -327,13 +329,13 @@ public class AckTest extends QpidTestCase public void testMultiAckAllReceivedTest() throws AMQException { - _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, - DEFAULT_CONSUMER_TAG, - null, - new LimitlessCreditManager()); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); @@ -364,15 +366,15 @@ public class AckTest extends QpidTestCase Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); - _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager); - _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Subscription.Option.SEES_REQUEUES, - Subscription.Option.ACQUIRES)); + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 1; publishMessages(msgCount); - _subscription.externalStateChange(); + _consumer.externalStateChange(); _channel.acknowledgeMessage(1, false); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java index 9fa9e49c6b..e8c922b023 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java @@ -27,9 +27,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestMemoryMessageStore; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -65,7 +63,7 @@ public class ExtractResendAndRequeueTest extends TestCase private static final int INITIAL_MSG_COUNT = 10; private AMQQueue _queue; private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>(); - private Subscription _subscription; + private Consumer _consumer; private boolean _queueDeleted; @Override @@ -76,8 +74,8 @@ public class ExtractResendAndRequeueTest extends TestCase _queue = mock(AMQQueue.class); when(_queue.getName()).thenReturn(getName()); when(_queue.isDeleted()).thenReturn(_queueDeleted); - _subscription = mock(Subscription.class); - when(_subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement()); + _consumer = mock(Consumer.class); + when(_consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement()); long id = 0; @@ -123,7 +121,7 @@ public class ExtractResendAndRequeueTest extends TestCase // Acquire messages in subscription for(QueueEntry entry : messageList) { - when(entry.getDeliveredSubscription()).thenReturn(_subscription); + when(entry.getDeliveredConsumer()).thenReturn(_consumer); } } @@ -168,7 +166,7 @@ public class ExtractResendAndRequeueTest extends TestCase acquireMessages(_referenceList); // Close subscription - when(_subscription.isClosed()).thenReturn(true); + when(_consumer.isClosed()).thenReturn(true); final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>(); final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 94dd0acf99..1fad8fb41f 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -49,7 +49,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; @@ -245,7 +245,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr @Override - public void deliverToClient(Subscription sub, ServerMessage message, + public void deliverToClient(Consumer sub, ServerMessage message, InstanceProperties props, long deliveryTag) throws AMQException { _deliveryCount.incrementAndGet(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index 8c716a0b56..247bc53cd1 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -131,7 +131,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase // indicate we are using the prefetch credit. i.e. using acks not No-Ack assertTrue("The subscription has been suspended", !getChannel().getSubscription(browser).getState() - .equals(Subscription.State.SUSPENDED)); + .equals(Consumer.State.SUSPENDED)); } private void checkStoreContents(int messageCount) diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index a3793392d5..027c40aabe 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -43,15 +43,14 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.subscription.AbstractSubscriptionTarget; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import java.nio.ByteBuffer; import java.util.List; -class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget +class ConsumerTarget_1_0 extends AbstractConsumerTarget { private final boolean _acquires; private SendingLink_1_0 _link; @@ -61,10 +60,10 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget private Binary _transactionId; private final AMQPDescribedTypeRegistry _typeRegistry; private final SectionEncoder _sectionEncoder; - private Subscription _subscription; + private Consumer _consumer; - public SubscriptionTarget_1_0(final SendingLink_1_0 link, - boolean acquires) + public ConsumerTarget_1_0(final SendingLink_1_0 link, + boolean acquires) { super(State.SUSPENDED); _link = link; @@ -73,9 +72,9 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget _acquires = acquires; } - public Subscription getSubscription() + public Consumer getConsumer() { - return _subscription; + return _consumer; } private SendingLinkEndpoint getEndpoint() @@ -94,7 +93,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget boolean closed = false; State state = getState(); - getSubscription().getSendLock(); + getConsumer().getSendLock(); try { while(!closed && state != State.CLOSED) @@ -109,7 +108,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget } finally { - getSubscription().releaseSendLock(); + getConsumer().releaseSendLock(); } } @@ -255,7 +254,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget public void onRollback() { - if(queueEntry.isAcquiredBy(getSubscription())) + if(queueEntry.isAcquiredBy(getConsumer())) { queueEntry.release(); _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); @@ -385,7 +384,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget public void postCommit() { - if(_queueEntry.isAcquiredBy(getSubscription())) + if(_queueEntry.isAcquiredBy(getConsumer())) { _queueEntry.delete(); } @@ -499,13 +498,13 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget } @Override - public void subscriptionRegistered(final Subscription sub) + public void consumerAdded(final Consumer sub) { - _subscription = sub; + _consumer = sub; } @Override - public void subscriptionRemoved(final Subscription sub) + public void consumerRemoved(final Consumer sub) { } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 390944ad3a..2fff1856c7 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -68,8 +68,7 @@ import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -82,8 +81,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS private VirtualHost _vhost; private SendingDestination _destination; - private Subscription _subscription; - private SubscriptionTarget_1_0 _target; + private Consumer _consumer; + private ConsumerTarget_1_0 _target; private boolean _draining; private final Map<Binary, MessageInstance> _unsettledMap = @@ -112,7 +111,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS linkAttachment.setDeliveryStateHandler(this); QueueDestination qd = null; - EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class); + EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class); boolean noLocal = false; @@ -175,11 +174,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } source.setFilter(actualFilters.isEmpty() ? null : actualFilters); - _target = new SubscriptionTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY); + _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY); if(source.getDistributionMode() != StdDistMode.COPY) { - options.add(Subscription.Option.ACQUIRES); - options.add(Subscription.Option.SEES_REQUEUES); + options.add(Consumer.Option.ACQUIRES); + options.add(Consumer.Option.SEES_REQUEUES); } } @@ -376,9 +375,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } - _target = new SubscriptionTarget_1_0(this, true); - options.add(Subscription.Option.ACQUIRES); - options.add(Subscription.Option.SEES_REQUEUES); + _target = new ConsumerTarget_1_0(this, true); + options.add(Consumer.Option.ACQUIRES); + options.add(Consumer.Option.SEES_REQUEUES); } else @@ -390,18 +389,18 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { if(noLocal) { - options.add(Subscription.Option.NO_LOCAL); + options.add(Consumer.Option.NO_LOCAL); } - _subscription.setNoLocal(noLocal); + _consumer.setNoLocal(noLocal); try { - _subscription = _queue.registerSubscription(_target, - messageFilter == null ? null : new SimpleFilterManager(messageFilter), - Message_1_0.class, getEndpoint().getName(), options); + _consumer = _queue.addConsumer(_target, + messageFilter == null ? null : new SimpleFilterManager(messageFilter), + Message_1_0.class, getEndpoint().getName(), options); } catch (AMQException e) { @@ -428,7 +427,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS try { - _subscription.close(); + _consumer.close(); } catch (AMQException e) @@ -622,7 +621,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment) { - if(_subscription.isActive()) + if(_consumer.isActive()) { _target.suspend(); } @@ -653,7 +652,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(outcome instanceof Accepted) { AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore()); - if(_subscription.acquires()) + if(_consumer.acquires()) { txn.dequeue(Collections.singleton(queueEntry), new ServerTransaction.Action() @@ -673,7 +672,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS else if(outcome instanceof Released) { AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore()); - if(_subscription.acquires()) + if(_consumer.acquires()) { txn.dequeue(Collections.singleton(queueEntry), new ServerTransaction.Action() diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java index a71d833fc3..9ca23ce1ce 100644 --- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java +++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java @@ -20,7 +20,6 @@ package org.apache.qpid.server.management.plugin.servlet.rest; import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -40,7 +39,7 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; @@ -327,8 +326,8 @@ public class MessageServlet extends AbstractServlet : entry.isAcquired() ? "Acquired" : ""); - final Subscription deliveredSubscription = entry.getDeliveredSubscription(); - object.put("deliveredTo", deliveredSubscription == null ? null : deliveredSubscription.getSubscriptionID()); + final Consumer deliveredConsumer = entry.getDeliveredConsumer(); + object.put("deliveredTo", deliveredConsumer == null ? null : deliveredConsumer.getId()); ServerMessage message = entry.getMessage(); if(message != null) diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java index 4defbd7eb0..db6da6f739 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java @@ -46,7 +46,7 @@ import java.util.List; * SUB-1002 : Close * SUB-1003 : State : <state> */ -public class SubscriptionLoggingTest extends AbstractTestLogging +public class ConsumerLoggingTest extends AbstractTestLogging { static final String SUB_PREFIX = "SUB-"; |