summaryrefslogtreecommitdiff
path: root/java/broker-core/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-07-22 22:31:17 +0000
committerKeith Wall <kwall@apache.org>2014-07-22 22:31:17 +0000
commit46c17d266ac7c92bf62fc8135c2840bfe4cdf65b (patch)
treea1da01b9d53befe680fc09380c68d56327b5871b /java/broker-core/src
parentbff408699f489c69ac22c984498558cbb4c8d390 (diff)
downloadqpid-python-46c17d266ac7c92bf62fc8135c2840bfe4cdf65b.tar.gz
QPID-5796: [Java Broker] Prevent possibility of AOOBE if connection registry is closed at the same time as closes are received
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1612716 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-core/src')
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java87
1 files changed, 45 insertions, 42 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index c599ab4608..883785c7ce 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -20,10 +20,14 @@
*/
package org.apache.qpid.server.connection;
+import static java.util.Collections.newSetFromMap;
+
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
@@ -32,85 +36,86 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
public class ConnectionRegistry implements IConnectionRegistry
{
- private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>();
-
private Logger _logger = Logger.getLogger(ConnectionRegistry.class);
- private final Collection<RegistryChangeListener> _listeners =
- new ArrayList<RegistryChangeListener>();
+ private final Set<AMQConnectionModel> _registry = newSetFromMap(new ConcurrentHashMap<AMQConnectionModel, Boolean>());
+ private final Collection<RegistryChangeListener> _listeners = new ArrayList<>();
+
+ @Override
public void initialise()
{
// None required
}
/** Close all of the currently open connections. */
+ @Override
public void close()
{
close(IConnectionRegistry.BROKER_SHUTDOWN_REPLY_TEXT);
}
+ @Override
public void close(final String replyText)
{
- synchronized(this)
- {
- for(AMQConnectionModel conn : _registry)
- {
- conn.stop();
- }
- }
if (_logger.isDebugEnabled())
{
_logger.debug("Closing connection registry :" + _registry.size() + " connections.");
}
- while (!_registry.isEmpty())
+ for(AMQConnectionModel conn : _registry)
{
- AMQConnectionModel connection = _registry.get(0);
+ conn.stop();
+ }
- try
- {
- connection.close(AMQConstant.CONNECTION_FORCED, replyText);
- }
- catch (Exception e)
+ while (!_registry.isEmpty())
+ {
+ Iterator<AMQConnectionModel> itr = _registry.iterator();
+ while(itr.hasNext())
{
- //remove this connection to ensure that we don't loop forever if it fails to close
- _registry.remove(connection);
-
- _logger.warn("Exception closing connection " + connection.getConnectionId() + " from " + connection.getRemoteAddressString(), e);
+ AMQConnectionModel connection = itr.next();
+ try
+ {
+ connection.close(AMQConstant.CONNECTION_FORCED, replyText);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Exception closing connection " + connection.getConnectionId() + " from " + connection.getRemoteAddressString(), e);
+ }
+ finally
+ {
+ itr.remove();
+ }
}
}
}
+ @Override
public void registerConnection(AMQConnectionModel connection)
{
- synchronized (this)
+ _registry.add(connection);
+ synchronized (_listeners)
{
- _registry.add(connection);
- synchronized (_listeners)
+ for(RegistryChangeListener listener : _listeners)
{
- for(RegistryChangeListener listener : _listeners)
- {
- listener.connectionRegistered(connection);
- }
+ listener.connectionRegistered(connection);
}
}
}
+ @Override
public void deregisterConnection(AMQConnectionModel connection)
{
- synchronized (this)
- {
- _registry.remove(connection);
+ _registry.remove(connection);
- synchronized (_listeners)
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
{
- for(RegistryChangeListener listener : _listeners)
- {
- listener.connectionUnregistered(connection);
- }
+ listener.connectionUnregistered(connection);
}
}
}
+ @Override
public void addRegistryChangeListener(RegistryChangeListener listener)
{
synchronized (_listeners)
@@ -119,11 +124,9 @@ public class ConnectionRegistry implements IConnectionRegistry
}
}
+ @Override
public List<AMQConnectionModel> getConnections()
{
- synchronized (this)
- {
- return new ArrayList<AMQConnectionModel>(_registry);
- }
+ return new ArrayList<>(_registry);
}
}