diff options
author | Martin Ritchie <ritchiem@apache.org> | 2008-03-14 10:51:40 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2008-03-14 10:51:40 +0000 |
commit | c57e80ac3fe600ee2969b00c1d81bfd2cfe0cbd0 (patch) | |
tree | 0ae699313c64bdd79f853b16701e4c1d9cc8a6f0 | |
parent | 20cf766ec6465c52c56984780256791d97f481ac (diff) | |
download | qpid-python-c57e80ac3fe600ee2969b00c1d81bfd2cfe0cbd0.tar.gz |
QPID-851 : Update to AMQChannel to prevent the memory leak when an autoclose consumer closes in a second thread before the register gets a chance to add the new session to the map.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637048 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 26 |
1 files changed, 18 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 5542fbc9b6..b04f60b1b0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -33,7 +33,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; -import org.apache.qpid.server.exchange.MessageRouter; +import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -43,17 +43,15 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.configuration.Configurator; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; public class AMQChannel { @@ -91,7 +89,7 @@ public class AMQChannel private AMQMessage _currentMessage; /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */ - private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>(); + private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>(); private final MessageStore _messageStore; @@ -333,9 +331,21 @@ public class AMQChannel throw new ConsumerTagNotUniqueException(); } - queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); + // We add before we register as the Async Delivery process may AutoClose the subscriber + // so calling _cT2QM.remove before we have done put which was after the register succeeded. + // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. _consumerTag2QueueMap.put(tag, queue); + try + { + queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); + } + catch (AMQException e) + { + _consumerTag2QueueMap.remove(tag); + throw e; + } + return tag; } @@ -822,7 +832,7 @@ public class AMQChannel { message.discard(_storeContext); message.setQueueDeleted(true); - + } catch (AMQException e) { @@ -967,7 +977,7 @@ public class AMQChannel public void processReturns(AMQProtocolSession session) throws AMQException { - if(!_returnMessages.isEmpty()) + if (!_returnMessages.isEmpty()) { for (RequiredDeliveryException bouncedMessage : _returnMessages) { |