summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-11 22:27:52 +0000
committerKeith Wall <kwall@apache.org>2015-02-11 22:27:52 +0000
commit90fcef0d551f0defd22a60b447446856cc39e750 (patch)
treed5c2fd9edc2c3349451f72b7e632b0a3bb121619
parent08f5f85f8e306c4dc20e75d976270c59753f54a4 (diff)
downloadqpid-python-90fcef0d551f0defd22a60b447446856cc39e750.tar.gz
QPID-6387: [Java Client] Remove array optimisation from session/consumer maps
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659103 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java92
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java93
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java2
5 files changed, 27 insertions, 173 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 12e9285af8..86e1bb0a8b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -133,7 +133,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/**
* Flag indicating to start dispatcher as a daemon thread
*/
- protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
+ protected final boolean DAEMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
/** The connection to which this session belongs. */
private AMQConnection _connection;
@@ -187,7 +187,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private MessageFactoryRegistry _messageFactoryRegistry;
/** Holds all of the producers created by this session, keyed by their unique identifiers. */
- private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
+ private final Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
/**
* Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume
@@ -195,7 +195,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
private int _nextTag = 1;
- private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
+ private final Map<Integer,C> _consumers = new ConcurrentHashMap<>();
/**
* Contains a list of consumers which have been removed but which might still have
@@ -294,12 +294,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
/**
- * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
- * consumer.
+ * Consumers associated with this session
*/
- protected IdToConsumerMap<C> getConsumers()
+ protected Collection<C> getConsumers()
{
- return _consumers;
+ return new ArrayList(_consumers.values());
}
protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup)
@@ -317,83 +316,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
abstract void handleLinkDelete(final AMQDestination dest) throws AMQException;
- public static final class IdToConsumerMap<C extends BasicMessageConsumer>
- {
- private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
- private final ConcurrentMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
-
- public C get(int id)
- {
- if ((id & 0xFFFFFFF0) == 0)
- {
- return (C) _fastAccessConsumers[id];
- }
- else
- {
- return _slowAccessConsumers.get(id);
- }
- }
-
- public C put(int id, C consumer)
- {
- C oldVal;
- if ((id & 0xFFFFFFF0) == 0)
- {
- oldVal = (C) _fastAccessConsumers[id];
- _fastAccessConsumers[id] = consumer;
- }
- else
- {
- oldVal = _slowAccessConsumers.put(id, consumer);
- }
-
- return oldVal;
-
- }
-
- public C remove(int id)
- {
- C consumer;
- if ((id & 0xFFFFFFF0) == 0)
- {
- consumer = (C) _fastAccessConsumers[id];
- _fastAccessConsumers[id] = null;
- }
- else
- {
- consumer = _slowAccessConsumers.remove(id);
- }
-
- return consumer;
-
- }
-
- public Collection<C> values()
- {
- ArrayList<C> values = new ArrayList<C>();
-
- for (int i = 0; i < 16; i++)
- {
- if (_fastAccessConsumers[i] != null)
- {
- values.add((C) _fastAccessConsumers[i]);
- }
- }
- values.addAll(_slowAccessConsumers.values());
-
- return values;
- }
-
- public void clear()
- {
- _slowAccessConsumers.clear();
- for (int i = 0; i < 16; i++)
- {
- _fastAccessConsumers[i] = null;
- }
- }
- }
-
/**
* Creates a new session on a connection.
*
@@ -2490,7 +2412,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
String dispatcherThreadName = "Dispatcher-" + _channelId + "-Conn-" + _connection.getConnectionNumber();
_dispatcherThread.setName(dispatcherThreadName);
- _dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD);
+ _dispatcherThread.setDaemon(DAEMON_DISPATCHER_THREAD);
_dispatcher.setConnectionStopped(initiallyStopped);
_dispatcherThread.start();
if (_dispatcherLogger.isDebugEnabled())
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index dc1f9a719e..206ca15c82 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -833,7 +833,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (suspend)
{
- for (BasicMessageConsumer consumer : getConsumers().values())
+ for (BasicMessageConsumer consumer : getConsumers())
{
getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
Option.UNRELIABLE);
@@ -842,7 +842,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
else
{
- for (BasicMessageConsumer_0_10 consumer : getConsumers().values())
+ for (BasicMessageConsumer_0_10 consumer : getConsumers())
{
String consumerTag = String.valueOf(consumer.getConsumerTag());
//only set if msg list is null
@@ -1320,7 +1320,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
drainDispatchQueue();
setUsingDispatcherForCleanup(false);
- for (BasicMessageConsumer consumer : getConsumers().values())
+ for (BasicMessageConsumer consumer : getConsumers())
{
List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
getPrefetchedMessageTags().addAll(tags);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 143de271a1..5fb9329af7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -27,6 +27,7 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W
import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -330,10 +331,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
_logger.debug("Prefetched message: _unacknowledgedMessageTags :" + getUnacknowledgedMessageTags());
}
- ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(getConsumers().values());
boolean messageListenerFound = false;
boolean serverRejectBehaviourFound = false;
- for(BasicMessageConsumer_0_8 consumer : consumersToCheck)
+ for(BasicMessageConsumer_0_8 consumer : getConsumers())
{
if (consumer.isMessageListenerSet())
{
@@ -344,7 +344,6 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
serverRejectBehaviourFound = true;
}
}
- _logger.debug("about to pre-reject messages for " + consumersToCheck.size() + " consumer(s)");
if (serverRejectBehaviourFound)
{
@@ -376,7 +375,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
// consumer on the queue. Whilst this is within the JMS spec it is not
// user friendly and avoidable.
boolean normalRejectBehaviour = true;
- for (BasicMessageConsumer_0_8 consumer : getConsumers().values())
+ for (BasicMessageConsumer_0_8 consumer : getConsumers())
{
if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour()))
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
index 2fdb35de49..f46c61daa7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
@@ -22,106 +22,45 @@ package org.apache.qpid.client;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public final class ChannelToSessionMap
{
- private final AMQSession[] _fastAccessSessions = new AMQSession[16];
- private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
- private int _size = 0;
- private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+ private final Map<Integer, AMQSession> _sessionMap = new ConcurrentHashMap<>();
private AtomicInteger _idFactory = new AtomicInteger(0);
private int _maxChannelID;
private int _minChannelID;
public AMQSession get(int channelId)
{
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- return _fastAccessSessions[channelId];
- }
- else
- {
- return _slowAccessSessions.get(channelId);
- }
+ return _sessionMap.get(channelId);
}
- public AMQSession put(int channelId, AMQSession session)
+ public void put(int channelId, AMQSession session)
{
- AMQSession oldVal;
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- oldVal = _fastAccessSessions[channelId];
- _fastAccessSessions[channelId] = session;
- }
- else
- {
- oldVal = _slowAccessSessions.put(channelId, session);
- }
- if ((oldVal != null) && (session == null))
- {
- _size--;
- }
- else if ((oldVal == null) && (session != null))
- {
- _size++;
- }
-
- return session;
-
+ _sessionMap.put(channelId, session);
}
- public AMQSession remove(int channelId)
+ public void remove(int channelId)
{
- AMQSession session;
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- session = _fastAccessSessions[channelId];
- _fastAccessSessions[channelId] = null;
- }
- else
- {
- session = _slowAccessSessions.remove(channelId);
- }
-
- if (session != null)
- {
- _size--;
- }
- return session;
-
+ _sessionMap.remove(channelId);
}
public Collection<AMQSession> values()
{
- ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
-
- for (int i = 0; i < 16; i++)
- {
- if (_fastAccessSessions[i] != null)
- {
- values.add(_fastAccessSessions[i]);
- }
- }
- values.addAll(_slowAccessSessions.values());
-
- return values;
+ return new ArrayList<>(_sessionMap.values());
}
public int size()
{
- return _size;
+ return _sessionMap.size();
}
public void clear()
{
- _size = 0;
- _slowAccessSessions.clear();
- for (int i = 0; i < 16; i++)
- {
- _fastAccessSessions[i] = null;
- }
+ _sessionMap.clear();
}
/*
@@ -141,14 +80,8 @@ public final class ChannelToSessionMap
//go back to the start
_idFactory.set(_minChannelID);
}
- if ((id & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- done = (_fastAccessSessions[id] == null);
- }
- else
- {
- done = (!_slowAccessSessions.keySet().contains(id));
- }
+
+ done = (!_sessionMap.keySet().contains(id));
}
return id;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
index d9514338ce..d625a9ae69 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
@@ -29,7 +29,7 @@ import javax.jms.XATopicConnection;
import javax.jms.XATopicSession;
/**
- * This class implements the javax.njms.XAConnection interface
+ * This class implements the javax.jms.XAConnection interface
*/
public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQueueConnection, XATopicConnection
{