summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java51
1 files changed, 27 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 3786c2020c..bac751e0c8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -20,19 +20,19 @@
*/
package org.apache.qpid.server.connection;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
public class ConnectionRegistry implements IConnectionRegistry, Closeable
{
- private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>();
+ private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>();
private Logger _logger = Logger.getLogger(ConnectionRegistry.class);
@@ -40,41 +40,44 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
{
// None required
}
-
- /** Close all of the currently open connections. */
- public void close()
+
+ public void expireClosedChannels()
{
- while (!_registry.isEmpty())
+ for (AMQProtocolSession connection : _registry)
{
- AMQConnectionModel connection = _registry.get(0);
- closeConnection(connection, AMQConstant.INTERNAL_ERROR, "Broker is shutting down");
+ connection.closeIfLingeringClosedChannels();
}
}
-
- public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message)
+
+ /** Close all of the currently open connections. */
+ public void close()
{
- try
- {
- connection.close(cause, message);
- }
- catch (AMQException e)
+ while (!_registry.isEmpty())
{
- _logger.warn("Error closing connection:" + e.getMessage());
+ AMQProtocolSession connection = _registry.get(0);
+
+ try
+ {
+ connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down",
+ 0, 0,
+ connection.getProtocolOutputConverter().getProtocolMajorVersion(),
+ connection.getProtocolOutputConverter().getProtocolMinorVersion(),
+ (Throwable) null), true);
+ }
+ catch (AMQException e)
+ {
+ _logger.warn("Error closing connection:" + e.getMessage());
+ }
}
}
- public void registerConnection(AMQConnectionModel connnection)
+ public void registerConnection(AMQProtocolSession connnection)
{
_registry.add(connnection);
}
- public void deregisterConnection(AMQConnectionModel connnection)
+ public void deregisterConnection(AMQProtocolSession connnection)
{
_registry.remove(connnection);
}
-
- public List<AMQConnectionModel> getConnections()
- {
- return new ArrayList<AMQConnectionModel>(_registry);
- }
}