diff options
author | Keith Wall <kwall@apache.org> | 2015-02-11 22:27:52 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2015-02-11 22:27:52 +0000 |
commit | 90fcef0d551f0defd22a60b447446856cc39e750 (patch) | |
tree | d5c2fd9edc2c3349451f72b7e632b0a3bb121619 | |
parent | 08f5f85f8e306c4dc20e75d976270c59753f54a4 (diff) | |
download | qpid-python-90fcef0d551f0defd22a60b447446856cc39e750.tar.gz |
QPID-6387: [Java Client] Remove array optimisation from session/consumer maps
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659103 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 27 insertions, 173 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 12e9285af8..86e1bb0a8b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -133,7 +133,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** * Flag indicating to start dispatcher as a daemon thread */ - protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER); + protected final boolean DAEMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER); /** The connection to which this session belongs. */ private AMQConnection _connection; @@ -187,7 +187,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private MessageFactoryRegistry _messageFactoryRegistry; /** Holds all of the producers created by this session, keyed by their unique identifiers. */ - private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); + private final Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); /** * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume @@ -195,7 +195,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ private int _nextTag = 1; - private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); + private final Map<Integer,C> _consumers = new ConcurrentHashMap<>(); /** * Contains a list of consumers which have been removed but which might still have @@ -294,12 +294,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } /** - * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right - * consumer. + * Consumers associated with this session */ - protected IdToConsumerMap<C> getConsumers() + protected Collection<C> getConsumers() { - return _consumers; + return new ArrayList(_consumers.values()); } protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup) @@ -317,83 +316,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic abstract void handleLinkDelete(final AMQDestination dest) throws AMQException; - public static final class IdToConsumerMap<C extends BasicMessageConsumer> - { - private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; - private final ConcurrentMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>(); - - public C get(int id) - { - if ((id & 0xFFFFFFF0) == 0) - { - return (C) _fastAccessConsumers[id]; - } - else - { - return _slowAccessConsumers.get(id); - } - } - - public C put(int id, C consumer) - { - C oldVal; - if ((id & 0xFFFFFFF0) == 0) - { - oldVal = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = consumer; - } - else - { - oldVal = _slowAccessConsumers.put(id, consumer); - } - - return oldVal; - - } - - public C remove(int id) - { - C consumer; - if ((id & 0xFFFFFFF0) == 0) - { - consumer = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = null; - } - else - { - consumer = _slowAccessConsumers.remove(id); - } - - return consumer; - - } - - public Collection<C> values() - { - ArrayList<C> values = new ArrayList<C>(); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessConsumers[i] != null) - { - values.add((C) _fastAccessConsumers[i]); - } - } - values.addAll(_slowAccessConsumers.values()); - - return values; - } - - public void clear() - { - _slowAccessConsumers.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessConsumers[i] = null; - } - } - } - /** * Creates a new session on a connection. * @@ -2490,7 +2412,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic String dispatcherThreadName = "Dispatcher-" + _channelId + "-Conn-" + _connection.getConnectionNumber(); _dispatcherThread.setName(dispatcherThreadName); - _dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD); + _dispatcherThread.setDaemon(DAEMON_DISPATCHER_THREAD); _dispatcher.setConnectionStopped(initiallyStopped); _dispatcherThread.start(); if (_dispatcherLogger.isDebugEnabled()) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index dc1f9a719e..206ca15c82 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -833,7 +833,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (suspend) { - for (BasicMessageConsumer consumer : getConsumers().values()) + for (BasicMessageConsumer consumer : getConsumers()) { getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), Option.UNRELIABLE); @@ -842,7 +842,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - for (BasicMessageConsumer_0_10 consumer : getConsumers().values()) + for (BasicMessageConsumer_0_10 consumer : getConsumers()) { String consumerTag = String.valueOf(consumer.getConsumerTag()); //only set if msg list is null @@ -1320,7 +1320,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic drainDispatchQueue(); setUsingDispatcherForCleanup(false); - for (BasicMessageConsumer consumer : getConsumers().values()) + for (BasicMessageConsumer consumer : getConsumers()) { List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); getPrefetchedMessageTags().addAll(tags); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 143de271a1..5fb9329af7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -27,6 +27,7 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -330,10 +331,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + getUnacknowledgedMessageTags()); } - ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(getConsumers().values()); boolean messageListenerFound = false; boolean serverRejectBehaviourFound = false; - for(BasicMessageConsumer_0_8 consumer : consumersToCheck) + for(BasicMessageConsumer_0_8 consumer : getConsumers()) { if (consumer.isMessageListenerSet()) { @@ -344,7 +344,6 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe serverRejectBehaviourFound = true; } } - _logger.debug("about to pre-reject messages for " + consumersToCheck.size() + " consumer(s)"); if (serverRejectBehaviourFound) { @@ -376,7 +375,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // consumer on the queue. Whilst this is within the JMS spec it is not // user friendly and avoidable. boolean normalRejectBehaviour = true; - for (BasicMessageConsumer_0_8 consumer : getConsumers().values()) + for (BasicMessageConsumer_0_8 consumer : getConsumers()) { if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour())) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java index 2fdb35de49..f46c61daa7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java @@ -22,106 +22,45 @@ package org.apache.qpid.client; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; public final class ChannelToSessionMap { - private final AMQSession[] _fastAccessSessions = new AMQSession[16]; - private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); - private int _size = 0; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private final Map<Integer, AMQSession> _sessionMap = new ConcurrentHashMap<>(); private AtomicInteger _idFactory = new AtomicInteger(0); private int _maxChannelID; private int _minChannelID; public AMQSession get(int channelId) { - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - return _fastAccessSessions[channelId]; - } - else - { - return _slowAccessSessions.get(channelId); - } + return _sessionMap.get(channelId); } - public AMQSession put(int channelId, AMQSession session) + public void put(int channelId, AMQSession session) { - AMQSession oldVal; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - oldVal = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = session; - } - else - { - oldVal = _slowAccessSessions.put(channelId, session); - } - if ((oldVal != null) && (session == null)) - { - _size--; - } - else if ((oldVal == null) && (session != null)) - { - _size++; - } - - return session; - + _sessionMap.put(channelId, session); } - public AMQSession remove(int channelId) + public void remove(int channelId) { - AMQSession session; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - session = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = null; - } - else - { - session = _slowAccessSessions.remove(channelId); - } - - if (session != null) - { - _size--; - } - return session; - + _sessionMap.remove(channelId); } public Collection<AMQSession> values() { - ArrayList<AMQSession> values = new ArrayList<AMQSession>(size()); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessSessions[i] != null) - { - values.add(_fastAccessSessions[i]); - } - } - values.addAll(_slowAccessSessions.values()); - - return values; + return new ArrayList<>(_sessionMap.values()); } public int size() { - return _size; + return _sessionMap.size(); } public void clear() { - _size = 0; - _slowAccessSessions.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessSessions[i] = null; - } + _sessionMap.clear(); } /* @@ -141,14 +80,8 @@ public final class ChannelToSessionMap //go back to the start _idFactory.set(_minChannelID); } - if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) - { - done = (_fastAccessSessions[id] == null); - } - else - { - done = (!_slowAccessSessions.keySet().contains(id)); - } + + done = (!_sessionMap.keySet().contains(id)); } return id; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java index d9514338ce..d625a9ae69 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -29,7 +29,7 @@ import javax.jms.XATopicConnection; import javax.jms.XATopicSession; /** - * This class implements the javax.njms.XAConnection interface + * This class implements the javax.jms.XAConnection interface */ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQueueConnection, XATopicConnection { |