summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java8
4 files changed, 40 insertions, 12 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 71e07172ed..7b50a2e3ad 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -44,6 +44,14 @@ public class ConnectionRegistry implements IConnectionRegistry
{
}
+
+ public void expireClosedChannels()
+ {
+ for (AMQProtocolSession connection : _registry)
+ {
+ connection.closeIfLingeringClosedChannels();
+ }
+ }
/** Close all of the currently open connections. */
public void close() throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 16ebc76185..4f396015a2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -29,6 +29,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
@@ -135,7 +137,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>();
+ private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
@@ -293,12 +295,8 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
}
else
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
- }
-
- closeProtocolSession();
+ // The channel has been told to close, we don't process any more frames until
+ // it's closed.
return;
}
}
@@ -513,7 +511,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
public boolean channelAwaitingClosure(int channelId)
{
- return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
+ return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
}
public void addChannel(AMQChannel channel) throws AMQException
@@ -525,7 +523,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
final int channelId = channel.getChannelId();
- if (_closingChannelsList.contains(channelId))
+ if (_closingChannelsList.containsKey(channelId))
{
throw new AMQException("Session is marked awaiting channel close");
}
@@ -632,7 +630,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
private void markChannelAwaitingCloseOk(int channelId)
{
- _closingChannelsList.add(channelId);
+ _closingChannelsList.put(channelId, System.currentTimeMillis());
}
/**
@@ -1023,7 +1021,19 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
{
return (_clientVersion == null) ? null : _clientVersion.toString();
}
-
-
+
+ @Override
+ public void closeIfLingeringClosedChannels()
+ {
+ for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
+ {
+ if (id.getValue() + 30000 > System.currentTimeMillis())
+ {
+ // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection
+ _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed");
+ closeProtocolSession();
+ }
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index b0bef04354..b16ed01c79 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -225,5 +225,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
void commitTransactions(AMQChannel channel) throws AMQException;
List<AMQChannel> getChannels();
+
+ void closeIfLingeringClosedChannels();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index fa6b2285eb..aec437b700 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -267,6 +267,14 @@ public class VirtualHost implements Accessable
_houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
period / 2,
period);
+
+ class ForceChannelClosuresTask extends TimerTask
+ {
+ public void run()
+ {
+ _connectionRegistry.expireClosedChannels();
+ }
+ }
}
}