diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 92 |
1 files changed, 7 insertions, 85 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 12e9285af8..86e1bb0a8b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -133,7 +133,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** * Flag indicating to start dispatcher as a daemon thread */ - protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER); + protected final boolean DAEMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER); /** The connection to which this session belongs. */ private AMQConnection _connection; @@ -187,7 +187,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private MessageFactoryRegistry _messageFactoryRegistry; /** Holds all of the producers created by this session, keyed by their unique identifiers. */ - private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); + private final Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); /** * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume @@ -195,7 +195,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ private int _nextTag = 1; - private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); + private final Map<Integer,C> _consumers = new ConcurrentHashMap<>(); /** * Contains a list of consumers which have been removed but which might still have @@ -294,12 +294,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } /** - * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right - * consumer. + * Consumers associated with this session */ - protected IdToConsumerMap<C> getConsumers() + protected Collection<C> getConsumers() { - return _consumers; + return new ArrayList(_consumers.values()); } protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup) @@ -317,83 +316,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic abstract void handleLinkDelete(final AMQDestination dest) throws AMQException; - public static final class IdToConsumerMap<C extends BasicMessageConsumer> - { - private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; - private final ConcurrentMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>(); - - public C get(int id) - { - if ((id & 0xFFFFFFF0) == 0) - { - return (C) _fastAccessConsumers[id]; - } - else - { - return _slowAccessConsumers.get(id); - } - } - - public C put(int id, C consumer) - { - C oldVal; - if ((id & 0xFFFFFFF0) == 0) - { - oldVal = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = consumer; - } - else - { - oldVal = _slowAccessConsumers.put(id, consumer); - } - - return oldVal; - - } - - public C remove(int id) - { - C consumer; - if ((id & 0xFFFFFFF0) == 0) - { - consumer = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = null; - } - else - { - consumer = _slowAccessConsumers.remove(id); - } - - return consumer; - - } - - public Collection<C> values() - { - ArrayList<C> values = new ArrayList<C>(); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessConsumers[i] != null) - { - values.add((C) _fastAccessConsumers[i]); - } - } - values.addAll(_slowAccessConsumers.values()); - - return values; - } - - public void clear() - { - _slowAccessConsumers.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessConsumers[i] = null; - } - } - } - /** * Creates a new session on a connection. * @@ -2490,7 +2412,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic String dispatcherThreadName = "Dispatcher-" + _channelId + "-Conn-" + _connection.getConnectionNumber(); _dispatcherThread.setName(dispatcherThreadName); - _dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD); + _dispatcherThread.setDaemon(DAEMON_DISPATCHER_THREAD); _dispatcher.setConnectionStopped(initiallyStopped); _dispatcherThread.start(); if (_dispatcherLogger.isDebugEnabled()) |