diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java | 54 |
1 files changed, 27 insertions, 27 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index 074f7c243b..d59b13902b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -35,7 +35,6 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFinder; -import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.LifetimePolicy; @@ -47,10 +46,11 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.MapValueConverter; -final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.SubscriptionRegistrationListener, AMQQueue.NotificationListener +final class QueueAdapter extends AbstractAdapter implements Queue, + AMQQueue.ConsumerRegistrationListener, AMQQueue.NotificationListener { @SuppressWarnings("serial") static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{ @@ -69,8 +69,8 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs private final AMQQueue _queue; private final Map<Binding, BindingAdapter> _bindingAdapters = new HashMap<Binding, BindingAdapter>(); - private Map<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter> _consumerAdapters = - new HashMap<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter>(); + private Map<Consumer, ConsumerAdapter> _consumerAdapters = + new HashMap<Consumer, ConsumerAdapter>(); private final VirtualHostAdapter _vhost; @@ -84,7 +84,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter); _queue = queue; - _queue.addSubscriptionRegistrationListener(this); + _queue.addConsumerRegistrationListener(this); populateConsumers(); _statistics = new QueueStatisticsAdapter(queue); _queue.setNotificationListener(this); @@ -124,21 +124,21 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs private void populateConsumers() { - Collection<org.apache.qpid.server.subscription.Subscription> actualSubscriptions = _queue.getConsumers(); + Collection<Consumer> actualConsumers = _queue.getConsumers(); synchronized (_consumerAdapters) { - Iterator<org.apache.qpid.server.subscription.Subscription> iter = _consumerAdapters.keySet().iterator(); - for(org.apache.qpid.server.subscription.Subscription subscription : actualSubscriptions) + Iterator<Consumer> iter = _consumerAdapters.keySet().iterator(); + for(Consumer consumer : actualConsumers) { - if(!_consumerAdapters.containsKey(subscription)) + if(!_consumerAdapters.containsKey(consumer)) { - SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel()); - ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, subscription); - _consumerAdapters.put(subscription, adapter); + SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel()); + ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, consumer); + _consumerAdapters.put(consumer, adapter); if (sessionAdapter != null) { // Register ConsumerAdapter with the SessionAdapter. - sessionAdapter.subscriptionRegistered(subscription, adapter); + sessionAdapter.consumerRegistered(consumer, adapter); } } } @@ -153,11 +153,11 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs } } - public Collection<Consumer> getConsumers() + public Collection<org.apache.qpid.server.model.Consumer> getConsumers() { synchronized (_consumerAdapters) { - return new ArrayList<Consumer>(_consumerAdapters.values()); + return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values()); } } @@ -502,7 +502,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs @Override public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) { - if(clazz == Consumer.class) + if(clazz == org.apache.qpid.server.model.Consumer.class) { return (Collection<C>) getConsumers(); } @@ -587,19 +587,19 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs return _queue; } - public void subscriptionRegistered(final AMQQueue queue, final Subscription subscription) + public void consumerAdded(final AMQQueue queue, final Consumer consumer) { ConsumerAdapter adapter = null; synchronized (_consumerAdapters) { - if(!_consumerAdapters.containsKey(subscription)) + if(!_consumerAdapters.containsKey(consumer)) { - SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel()); - adapter = new ConsumerAdapter(this, sessionAdapter, subscription); - _consumerAdapters.put(subscription, adapter); + SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel()); + adapter = new ConsumerAdapter(this, sessionAdapter, consumer); + _consumerAdapters.put(consumer, adapter); if (sessionAdapter != null) { // Register ConsumerAdapter with the SessionAdapter. - sessionAdapter.subscriptionRegistered(subscription, adapter); + sessionAdapter.consumerRegistered(consumer, adapter); } } } @@ -609,20 +609,20 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs } } - public void subscriptionUnregistered(final AMQQueue queue, final Subscription subscription) + public void consumerRemoved(final AMQQueue queue, final Consumer consumer) { ConsumerAdapter adapter = null; synchronized (_consumerAdapters) { - adapter = _consumerAdapters.remove(subscription); + adapter = _consumerAdapters.remove(consumer); } if(adapter != null) { - SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel()); + SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel()); if (sessionAdapter != null) { // Unregister ConsumerAdapter with the SessionAdapter. - sessionAdapter.subscriptionUnregistered(subscription); + sessionAdapter.consumerUnregistered(consumer); } childRemoved(adapter); } |