summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-03-14 10:51:40 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-03-14 10:51:40 +0000
commitc57e80ac3fe600ee2969b00c1d81bfd2cfe0cbd0 (patch)
tree0ae699313c64bdd79f853b16701e4c1d9cc8a6f0
parent20cf766ec6465c52c56984780256791d97f481ac (diff)
downloadqpid-python-c57e80ac3fe600ee2969b00c1d81bfd2cfe0cbd0.tar.gz
QPID-851 : Update to AMQChannel to prevent the memory leak when an autoclose consumer closes in a second thread before the register gets a chance to add the new session to the map.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637048 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java26
1 files changed, 18 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 5542fbc9b6..b04f60b1b0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -33,7 +33,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.exchange.MessageRouter;
+import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -43,17 +43,15 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.configuration.Configurator;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
public class AMQChannel
{
@@ -91,7 +89,7 @@ public class AMQChannel
private AMQMessage _currentMessage;
/** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>();
+ private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private final MessageStore _messageStore;
@@ -333,9 +331,21 @@ public class AMQChannel
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
+ // We add before we register as the Async Delivery process may AutoClose the subscriber
+ // so calling _cT2QM.remove before we have done put which was after the register succeeded.
+ // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
_consumerTag2QueueMap.put(tag, queue);
+ try
+ {
+ queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
+ }
+ catch (AMQException e)
+ {
+ _consumerTag2QueueMap.remove(tag);
+ throw e;
+ }
+
return tag;
}
@@ -822,7 +832,7 @@ public class AMQChannel
{
message.discard(_storeContext);
message.setQueueDeleted(true);
-
+
}
catch (AMQException e)
{
@@ -967,7 +977,7 @@ public class AMQChannel
public void processReturns(AMQProtocolSession session) throws AMQException
{
- if(!_returnMessages.isEmpty())
+ if (!_returnMessages.isEmpty())
{
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{