diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java | 51 |
1 files changed, 27 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 3786c2020c..bac751e0c8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -20,19 +20,19 @@ */ package org.apache.qpid.server.connection; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.common.Closeable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQProtocolSession; public class ConnectionRegistry implements IConnectionRegistry, Closeable { - private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>(); + private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>(); private Logger _logger = Logger.getLogger(ConnectionRegistry.class); @@ -40,41 +40,44 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable { // None required } - - /** Close all of the currently open connections. */ - public void close() + + public void expireClosedChannels() { - while (!_registry.isEmpty()) + for (AMQProtocolSession connection : _registry) { - AMQConnectionModel connection = _registry.get(0); - closeConnection(connection, AMQConstant.INTERNAL_ERROR, "Broker is shutting down"); + connection.closeIfLingeringClosedChannels(); } } - - public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message) + + /** Close all of the currently open connections. */ + public void close() { - try - { - connection.close(cause, message); - } - catch (AMQException e) + while (!_registry.isEmpty()) { - _logger.warn("Error closing connection:" + e.getMessage()); + AMQProtocolSession connection = _registry.get(0); + + try + { + connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down", + 0, 0, + connection.getProtocolOutputConverter().getProtocolMajorVersion(), + connection.getProtocolOutputConverter().getProtocolMinorVersion(), + (Throwable) null), true); + } + catch (AMQException e) + { + _logger.warn("Error closing connection:" + e.getMessage()); + } } } - public void registerConnection(AMQConnectionModel connnection) + public void registerConnection(AMQProtocolSession connnection) { _registry.add(connnection); } - public void deregisterConnection(AMQConnectionModel connnection) + public void deregisterConnection(AMQProtocolSession connnection) { _registry.remove(connnection); } - - public List<AMQConnectionModel> getConnections() - { - return new ArrayList<AMQConnectionModel>(_registry); - } } |