diff options
4 files changed, 40 insertions, 12 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 71e07172ed..7b50a2e3ad 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -44,6 +44,14 @@ public class ConnectionRegistry implements IConnectionRegistry { } + + public void expireClosedChannels() + { + for (AMQProtocolSession connection : _registry) + { + connection.closeIfLingeringClosedChannels(); + } + } /** Close all of the currently open connections. */ public void close() throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 16ebc76185..4f396015a2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -29,6 +29,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicLong; @@ -135,7 +137,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private FieldTable _clientProperties; private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); - private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>(); + private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>(); private ProtocolOutputConverter _protocolOutputConverter; private Principal _authorizedID; private MethodDispatcher _dispatcher; @@ -293,12 +295,8 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol } else { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame); - } - - closeProtocolSession(); + // The channel has been told to close, we don't process any more frames until + // it's closed. return; } } @@ -513,7 +511,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public boolean channelAwaitingClosure(int channelId) { - return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId); + return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId); } public void addChannel(AMQChannel channel) throws AMQException @@ -525,7 +523,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol final int channelId = channel.getChannelId(); - if (_closingChannelsList.contains(channelId)) + if (_closingChannelsList.containsKey(channelId)) { throw new AMQException("Session is marked awaiting channel close"); } @@ -632,7 +630,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private void markChannelAwaitingCloseOk(int channelId) { - _closingChannelsList.add(channelId); + _closingChannelsList.put(channelId, System.currentTimeMillis()); } /** @@ -1023,7 +1021,19 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { return (_clientVersion == null) ? null : _clientVersion.toString(); } - - + + @Override + public void closeIfLingeringClosedChannels() + { + for (Entry<Integer, Long>id : _closingChannelsList.entrySet()) + { + if (id.getValue() + 30000 > System.currentTimeMillis()) + { + // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection + _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed"); + closeProtocolSession(); + } + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index b0bef04354..b16ed01c79 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -225,5 +225,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession void commitTransactions(AMQChannel channel) throws AMQException; List<AMQChannel> getChannels(); + + void closeIfLingeringClosedChannels(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index fa6b2285eb..aec437b700 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -267,6 +267,14 @@ public class VirtualHost implements Accessable _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(), period / 2, period); + + class ForceChannelClosuresTask extends TimerTask + { + public void run() + { + _connectionRegistry.expireClosedChannels(); + } + } } } |