diff options
author | Keith Wall <kwall@apache.org> | 2014-07-22 22:31:17 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-07-22 22:31:17 +0000 |
commit | 46c17d266ac7c92bf62fc8135c2840bfe4cdf65b (patch) | |
tree | a1da01b9d53befe680fc09380c68d56327b5871b /java/broker-core/src | |
parent | bff408699f489c69ac22c984498558cbb4c8d390 (diff) | |
download | qpid-python-46c17d266ac7c92bf62fc8135c2840bfe4cdf65b.tar.gz |
QPID-5796: [Java Broker] Prevent possibility of AOOBE if connection registry is closed at the same time as closes are received
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1612716 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-core/src')
-rw-r--r-- | java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java | 87 |
1 files changed, 45 insertions, 42 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index c599ab4608..883785c7ce 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -20,10 +20,14 @@ */ package org.apache.qpid.server.connection; +import static java.util.Collections.newSetFromMap; + import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; @@ -32,85 +36,86 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; public class ConnectionRegistry implements IConnectionRegistry { - private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>(); - private Logger _logger = Logger.getLogger(ConnectionRegistry.class); - private final Collection<RegistryChangeListener> _listeners = - new ArrayList<RegistryChangeListener>(); + private final Set<AMQConnectionModel> _registry = newSetFromMap(new ConcurrentHashMap<AMQConnectionModel, Boolean>()); + private final Collection<RegistryChangeListener> _listeners = new ArrayList<>(); + + @Override public void initialise() { // None required } /** Close all of the currently open connections. */ + @Override public void close() { close(IConnectionRegistry.BROKER_SHUTDOWN_REPLY_TEXT); } + @Override public void close(final String replyText) { - synchronized(this) - { - for(AMQConnectionModel conn : _registry) - { - conn.stop(); - } - } if (_logger.isDebugEnabled()) { _logger.debug("Closing connection registry :" + _registry.size() + " connections."); } - while (!_registry.isEmpty()) + for(AMQConnectionModel conn : _registry) { - AMQConnectionModel connection = _registry.get(0); + conn.stop(); + } - try - { - connection.close(AMQConstant.CONNECTION_FORCED, replyText); - } - catch (Exception e) + while (!_registry.isEmpty()) + { + Iterator<AMQConnectionModel> itr = _registry.iterator(); + while(itr.hasNext()) { - //remove this connection to ensure that we don't loop forever if it fails to close - _registry.remove(connection); - - _logger.warn("Exception closing connection " + connection.getConnectionId() + " from " + connection.getRemoteAddressString(), e); + AMQConnectionModel connection = itr.next(); + try + { + connection.close(AMQConstant.CONNECTION_FORCED, replyText); + } + catch (Exception e) + { + _logger.warn("Exception closing connection " + connection.getConnectionId() + " from " + connection.getRemoteAddressString(), e); + } + finally + { + itr.remove(); + } } } } + @Override public void registerConnection(AMQConnectionModel connection) { - synchronized (this) + _registry.add(connection); + synchronized (_listeners) { - _registry.add(connection); - synchronized (_listeners) + for(RegistryChangeListener listener : _listeners) { - for(RegistryChangeListener listener : _listeners) - { - listener.connectionRegistered(connection); - } + listener.connectionRegistered(connection); } } } + @Override public void deregisterConnection(AMQConnectionModel connection) { - synchronized (this) - { - _registry.remove(connection); + _registry.remove(connection); - synchronized (_listeners) + synchronized (_listeners) + { + for(RegistryChangeListener listener : _listeners) { - for(RegistryChangeListener listener : _listeners) - { - listener.connectionUnregistered(connection); - } + listener.connectionUnregistered(connection); } } } + @Override public void addRegistryChangeListener(RegistryChangeListener listener) { synchronized (_listeners) @@ -119,11 +124,9 @@ public class ConnectionRegistry implements IConnectionRegistry } } + @Override public List<AMQConnectionModel> getConnections() { - synchronized (this) - { - return new ArrayList<AMQConnectionModel>(_registry); - } + return new ArrayList<>(_registry); } } |