summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-09-16 10:07:44 +0000
committerAidan Skinner <aidan@apache.org>2009-09-16 10:07:44 +0000
commit23ed5eb6dbd7cf39cf18b308a0d4e339c4db1c83 (patch)
tree55727c0c91521d8be6bbe99fd5baf60d07e50009
parent9c4ecc45da929750ff7f0e0a5d7ada4e674b9105 (diff)
downloadqpid-python-23ed5eb6dbd7cf39cf18b308a0d4e339c4db1c83.tar.gz
QPID-2106: Don't close connections if the broker has asked it to close and
there's still stuff to process. Let the cleanup thread do that so that publishes which are denied don't result in instant connection death. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815705 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java8
4 files changed, 40 insertions, 12 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 71e07172ed..7b50a2e3ad 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -44,6 +44,14 @@ public class ConnectionRegistry implements IConnectionRegistry
{
}
+
+ public void expireClosedChannels()
+ {
+ for (AMQProtocolSession connection : _registry)
+ {
+ connection.closeIfLingeringClosedChannels();
+ }
+ }
/** Close all of the currently open connections. */
public void close() throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 16ebc76185..4f396015a2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -29,6 +29,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
@@ -135,7 +137,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>();
+ private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
@@ -293,12 +295,8 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
}
else
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
- }
-
- closeProtocolSession();
+ // The channel has been told to close, we don't process any more frames until
+ // it's closed.
return;
}
}
@@ -513,7 +511,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
public boolean channelAwaitingClosure(int channelId)
{
- return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
+ return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
}
public void addChannel(AMQChannel channel) throws AMQException
@@ -525,7 +523,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
final int channelId = channel.getChannelId();
- if (_closingChannelsList.contains(channelId))
+ if (_closingChannelsList.containsKey(channelId))
{
throw new AMQException("Session is marked awaiting channel close");
}
@@ -632,7 +630,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
private void markChannelAwaitingCloseOk(int channelId)
{
- _closingChannelsList.add(channelId);
+ _closingChannelsList.put(channelId, System.currentTimeMillis());
}
/**
@@ -1023,7 +1021,19 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
{
return (_clientVersion == null) ? null : _clientVersion.toString();
}
-
-
+
+ @Override
+ public void closeIfLingeringClosedChannels()
+ {
+ for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
+ {
+ if (id.getValue() + 30000 > System.currentTimeMillis())
+ {
+ // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection
+ _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed");
+ closeProtocolSession();
+ }
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index b0bef04354..b16ed01c79 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -225,5 +225,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
void commitTransactions(AMQChannel channel) throws AMQException;
List<AMQChannel> getChannels();
+
+ void closeIfLingeringClosedChannels();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index fa6b2285eb..aec437b700 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -267,6 +267,14 @@ public class VirtualHost implements Accessable
_houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
period / 2,
period);
+
+ class ForceChannelClosuresTask extends TimerTask
+ {
+ public void run()
+ {
+ _connectionRegistry.expireClosedChannels();
+ }
+ }
}
}