summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java54
1 files changed, 27 insertions, 27 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index 074f7c243b..d59b13902b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -35,7 +35,6 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFinder;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -47,10 +46,11 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.MapValueConverter;
-final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.SubscriptionRegistrationListener, AMQQueue.NotificationListener
+final class QueueAdapter extends AbstractAdapter implements Queue,
+ AMQQueue.ConsumerRegistrationListener, AMQQueue.NotificationListener
{
@SuppressWarnings("serial")
static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{
@@ -69,8 +69,8 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
private final AMQQueue _queue;
private final Map<Binding, BindingAdapter> _bindingAdapters =
new HashMap<Binding, BindingAdapter>();
- private Map<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter> _consumerAdapters =
- new HashMap<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter>();
+ private Map<Consumer, ConsumerAdapter> _consumerAdapters =
+ new HashMap<Consumer, ConsumerAdapter>();
private final VirtualHostAdapter _vhost;
@@ -84,7 +84,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter);
_queue = queue;
- _queue.addSubscriptionRegistrationListener(this);
+ _queue.addConsumerRegistrationListener(this);
populateConsumers();
_statistics = new QueueStatisticsAdapter(queue);
_queue.setNotificationListener(this);
@@ -124,21 +124,21 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
private void populateConsumers()
{
- Collection<org.apache.qpid.server.subscription.Subscription> actualSubscriptions = _queue.getConsumers();
+ Collection<Consumer> actualConsumers = _queue.getConsumers();
synchronized (_consumerAdapters)
{
- Iterator<org.apache.qpid.server.subscription.Subscription> iter = _consumerAdapters.keySet().iterator();
- for(org.apache.qpid.server.subscription.Subscription subscription : actualSubscriptions)
+ Iterator<Consumer> iter = _consumerAdapters.keySet().iterator();
+ for(Consumer consumer : actualConsumers)
{
- if(!_consumerAdapters.containsKey(subscription))
+ if(!_consumerAdapters.containsKey(consumer))
{
- SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
- ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
- _consumerAdapters.put(subscription, adapter);
+ SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
+ ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, consumer);
+ _consumerAdapters.put(consumer, adapter);
if (sessionAdapter != null)
{ // Register ConsumerAdapter with the SessionAdapter.
- sessionAdapter.subscriptionRegistered(subscription, adapter);
+ sessionAdapter.consumerRegistered(consumer, adapter);
}
}
}
@@ -153,11 +153,11 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
}
}
- public Collection<Consumer> getConsumers()
+ public Collection<org.apache.qpid.server.model.Consumer> getConsumers()
{
synchronized (_consumerAdapters)
{
- return new ArrayList<Consumer>(_consumerAdapters.values());
+ return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values());
}
}
@@ -502,7 +502,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
- if(clazz == Consumer.class)
+ if(clazz == org.apache.qpid.server.model.Consumer.class)
{
return (Collection<C>) getConsumers();
}
@@ -587,19 +587,19 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
return _queue;
}
- public void subscriptionRegistered(final AMQQueue queue, final Subscription subscription)
+ public void consumerAdded(final AMQQueue queue, final Consumer consumer)
{
ConsumerAdapter adapter = null;
synchronized (_consumerAdapters)
{
- if(!_consumerAdapters.containsKey(subscription))
+ if(!_consumerAdapters.containsKey(consumer))
{
- SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
- adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
- _consumerAdapters.put(subscription, adapter);
+ SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
+ adapter = new ConsumerAdapter(this, sessionAdapter, consumer);
+ _consumerAdapters.put(consumer, adapter);
if (sessionAdapter != null)
{ // Register ConsumerAdapter with the SessionAdapter.
- sessionAdapter.subscriptionRegistered(subscription, adapter);
+ sessionAdapter.consumerRegistered(consumer, adapter);
}
}
}
@@ -609,20 +609,20 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
}
}
- public void subscriptionUnregistered(final AMQQueue queue, final Subscription subscription)
+ public void consumerRemoved(final AMQQueue queue, final Consumer consumer)
{
ConsumerAdapter adapter = null;
synchronized (_consumerAdapters)
{
- adapter = _consumerAdapters.remove(subscription);
+ adapter = _consumerAdapters.remove(consumer);
}
if(adapter != null)
{
- SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
if (sessionAdapter != null)
{ // Unregister ConsumerAdapter with the SessionAdapter.
- sessionAdapter.subscriptionUnregistered(subscription);
+ sessionAdapter.consumerUnregistered(consumer);
}
childRemoved(adapter);
}