summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java92
1 files changed, 7 insertions, 85 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 12e9285af8..86e1bb0a8b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -133,7 +133,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/**
* Flag indicating to start dispatcher as a daemon thread
*/
- protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
+ protected final boolean DAEMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
/** The connection to which this session belongs. */
private AMQConnection _connection;
@@ -187,7 +187,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private MessageFactoryRegistry _messageFactoryRegistry;
/** Holds all of the producers created by this session, keyed by their unique identifiers. */
- private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
+ private final Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
/**
* Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume
@@ -195,7 +195,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
private int _nextTag = 1;
- private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
+ private final Map<Integer,C> _consumers = new ConcurrentHashMap<>();
/**
* Contains a list of consumers which have been removed but which might still have
@@ -294,12 +294,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
/**
- * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
- * consumer.
+ * Consumers associated with this session
*/
- protected IdToConsumerMap<C> getConsumers()
+ protected Collection<C> getConsumers()
{
- return _consumers;
+ return new ArrayList(_consumers.values());
}
protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup)
@@ -317,83 +316,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
abstract void handleLinkDelete(final AMQDestination dest) throws AMQException;
- public static final class IdToConsumerMap<C extends BasicMessageConsumer>
- {
- private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
- private final ConcurrentMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
-
- public C get(int id)
- {
- if ((id & 0xFFFFFFF0) == 0)
- {
- return (C) _fastAccessConsumers[id];
- }
- else
- {
- return _slowAccessConsumers.get(id);
- }
- }
-
- public C put(int id, C consumer)
- {
- C oldVal;
- if ((id & 0xFFFFFFF0) == 0)
- {
- oldVal = (C) _fastAccessConsumers[id];
- _fastAccessConsumers[id] = consumer;
- }
- else
- {
- oldVal = _slowAccessConsumers.put(id, consumer);
- }
-
- return oldVal;
-
- }
-
- public C remove(int id)
- {
- C consumer;
- if ((id & 0xFFFFFFF0) == 0)
- {
- consumer = (C) _fastAccessConsumers[id];
- _fastAccessConsumers[id] = null;
- }
- else
- {
- consumer = _slowAccessConsumers.remove(id);
- }
-
- return consumer;
-
- }
-
- public Collection<C> values()
- {
- ArrayList<C> values = new ArrayList<C>();
-
- for (int i = 0; i < 16; i++)
- {
- if (_fastAccessConsumers[i] != null)
- {
- values.add((C) _fastAccessConsumers[i]);
- }
- }
- values.addAll(_slowAccessConsumers.values());
-
- return values;
- }
-
- public void clear()
- {
- _slowAccessConsumers.clear();
- for (int i = 0; i < 16; i++)
- {
- _fastAccessConsumers[i] = null;
- }
- }
- }
-
/**
* Creates a new session on a connection.
*
@@ -2490,7 +2412,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
String dispatcherThreadName = "Dispatcher-" + _channelId + "-Conn-" + _connection.getConnectionNumber();
_dispatcherThread.setName(dispatcherThreadName);
- _dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD);
+ _dispatcherThread.setDaemon(DAEMON_DISPATCHER_THREAD);
_dispatcher.setConnectionStopped(initiallyStopped);
_dispatcherThread.start();
if (_dispatcherLogger.isDebugEnabled())