diff options
author | Keith Wall <kwall@apache.org> | 2012-02-09 17:34:24 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2012-02-09 17:34:24 +0000 |
commit | 1c21d7c23b4a5e54d824e4861dd7aca511340b66 (patch) | |
tree | b7baf8cc7deeb324e45e9ed179ff6e379a154f6e | |
parent | 866664af8c5638d766e4b98af3aa89e8dbfe04df (diff) | |
download | qpid-python-1c21d7c23b4a5e54d824e4861dd7aca511340b66.tar.gz |
QPID-3823: ServerSession unblock(AMQQueue) can cause NPE when trying to remove a queue from _blockingQueues Map that is not present
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Philip Harvey <phil@philharveyonline.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1242408 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 85 insertions, 67 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 46657e2a9a..2266de0ae4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,8 +20,22 @@ */ package org.apache.qpid.server; -import org.apache.log4j.Logger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQMethodBody; @@ -79,21 +93,6 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder { public static final int DEFAULT_PREFETCH = 4096; @@ -155,7 +154,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm private final AMQProtocolSession _session; private AtomicBoolean _closing = new AtomicBoolean(false); - private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); + private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>(); private final AtomicBoolean _blocking = new AtomicBoolean(false); @@ -1363,7 +1362,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm public void block(AMQQueue queue) { - if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + if(_blockingQueues.add(queue)) { if(_blocking.compareAndSet(false,true)) @@ -1616,4 +1615,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } } + @Override + public int compareTo(AMQSessionModel session) + { + return getId().toString().compareTo(session.getID().toString()); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index 4c603c899a..a80eb46cfa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -20,11 +20,19 @@ */ package org.apache.qpid.server.protocol; +import java.util.concurrent.ConcurrentSkipListSet; + import org.apache.qpid.AMQException; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.SimpleAMQQueue; -public interface AMQSessionModel +/** + * Session model interface. + * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet} + * when monitoring the blocking and blocking of queues/sessions in {@link SimpleAMQQueue}. + */ +public interface AMQSessionModel extends Comparable<AMQSessionModel> { public Object getID(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index c4df3dd092..c6d634fb28 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -18,8 +18,23 @@ */ package org.apache.qpid.server.queue; -import org.apache.log4j.Logger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.management.JMException; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; @@ -52,22 +67,6 @@ import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; -import javax.management.JMException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); @@ -165,7 +164,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private AtomicInteger _deliveredMessages = new AtomicInteger(); private AtomicBoolean _stopped = new AtomicBoolean(false); - private final ConcurrentMap<AMQSessionModel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQSessionModel, Boolean>(); + private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>(); private final AtomicBoolean _deleted = new AtomicBoolean(false); private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); @@ -1629,7 +1628,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes //Overfull log message _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity)); - _blockedChannels.putIfAbsent(channel, Boolean.TRUE); + _blockedChannels.add(channel); channel.block(this); @@ -1662,11 +1661,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); } - - for(AMQSessionModel c : _blockedChannels.keySet()) + for(final AMQSessionModel blockedChannel : _blockedChannels) { - c.unblock(this); - _blockedChannels.remove(c); + blockedChannel.unblock(this); + _blockedChannels.remove(blockedChannel); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 5a208aaeaf..e4268ed2dc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -20,8 +20,28 @@ */ package org.apache.qpid.server.transport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; +import static org.apache.qpid.util.Serial.gt; + +import java.security.Principal; +import java.text.MessageFormat; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.security.auth.Subject; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -65,27 +85,8 @@ import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.RangeSetFactory; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; - -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; -import static org.apache.qpid.util.Serial.gt; - -import javax.security.auth.Subject; -import java.security.Principal; -import java.text.MessageFormat; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, @@ -102,7 +103,7 @@ public class ServerSession extends Session private long _createTime = System.currentTimeMillis(); private LogActor _actor = GenericActor.getInstance(this); - private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); + private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>(); private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; @@ -684,7 +685,8 @@ public class ServerSession extends Session public void block(AMQQueue queue) { - if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + + if(_blockingQueues.add(queue)) { if(_blocking.compareAndSet(false,true)) @@ -897,4 +899,10 @@ public class ServerSession extends Session return _future.isComplete(); } } + + @Override + public int compareTo(AMQSessionModel session) + { + return getId().toString().compareTo(session.getID().toString()); + } } |