From 23ed5eb6dbd7cf39cf18b308a0d4e339c4db1c83 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 16 Sep 2009 10:07:44 +0000 Subject: QPID-2106: Don't close connections if the broker has asked it to close and there's still stuff to process. Let the cleanup thread do that so that publishes which are denied don't result in instant connection death. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815705 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/connection/ConnectionRegistry.java | 8 +++++ .../qpid/server/protocol/AMQProtocolEngine.java | 34 ++++++++++++++-------- .../qpid/server/protocol/AMQProtocolSession.java | 2 ++ .../qpid/server/virtualhost/VirtualHost.java | 8 +++++ 4 files changed, 40 insertions(+), 12 deletions(-) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 71e07172ed..7b50a2e3ad 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -44,6 +44,14 @@ public class ConnectionRegistry implements IConnectionRegistry { } + + public void expireClosedChannels() + { + for (AMQProtocolSession connection : _registry) + { + connection.closeIfLingeringClosedChannels(); + } + } /** Close all of the currently open connections. */ public void close() throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 16ebc76185..4f396015a2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -29,6 +29,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicLong; @@ -135,7 +137,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private FieldTable _clientProperties; private final List _taskList = new CopyOnWriteArrayList(); - private List _closingChannelsList = new CopyOnWriteArrayList(); + private Map _closingChannelsList = new ConcurrentHashMap(); private ProtocolOutputConverter _protocolOutputConverter; private Principal _authorizedID; private MethodDispatcher _dispatcher; @@ -293,12 +295,8 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol } else { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame); - } - - closeProtocolSession(); + // The channel has been told to close, we don't process any more frames until + // it's closed. return; } } @@ -513,7 +511,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public boolean channelAwaitingClosure(int channelId) { - return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId); + return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId); } public void addChannel(AMQChannel channel) throws AMQException @@ -525,7 +523,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol final int channelId = channel.getChannelId(); - if (_closingChannelsList.contains(channelId)) + if (_closingChannelsList.containsKey(channelId)) { throw new AMQException("Session is marked awaiting channel close"); } @@ -632,7 +630,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private void markChannelAwaitingCloseOk(int channelId) { - _closingChannelsList.add(channelId); + _closingChannelsList.put(channelId, System.currentTimeMillis()); } /** @@ -1023,7 +1021,19 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { return (_clientVersion == null) ? null : _clientVersion.toString(); } - - + + @Override + public void closeIfLingeringClosedChannels() + { + for (Entryid : _closingChannelsList.entrySet()) + { + if (id.getValue() + 30000 > System.currentTimeMillis()) + { + // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection + _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed"); + closeProtocolSession(); + } + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index b0bef04354..b16ed01c79 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -225,5 +225,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession void commitTransactions(AMQChannel channel) throws AMQException; List getChannels(); + + void closeIfLingeringClosedChannels(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index fa6b2285eb..aec437b700 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -267,6 +267,14 @@ public class VirtualHost implements Accessable _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(), period / 2, period); + + class ForceChannelClosuresTask extends TimerTask + { + public void run() + { + _connectionRegistry.expireClosedChannels(); + } + } } } -- cgit v1.2.1