From 1c21d7c23b4a5e54d824e4861dd7aca511340b66 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Thu, 9 Feb 2012 17:34:24 +0000 Subject: QPID-3823: ServerSession unblock(AMQQueue) can cause NPE when trying to remove a queue from _blockingQueues Map that is not present Applied patch from Andrew MacBean and Philip Harvey git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1242408 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 40 ++++++++------- .../qpid/server/protocol/AMQSessionModel.java | 10 +++- .../apache/qpid/server/queue/SimpleAMQQueue.java | 44 ++++++++-------- .../qpid/server/transport/ServerSession.java | 58 ++++++++++++---------- 4 files changed, 85 insertions(+), 67 deletions(-) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 46657e2a9a..2266de0ae4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,8 +20,22 @@ */ package org.apache.qpid.server; -import org.apache.log4j.Logger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQMethodBody; @@ -79,21 +93,6 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder { public static final int DEFAULT_PREFETCH = 4096; @@ -155,7 +154,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm private final AMQProtocolSession _session; private AtomicBoolean _closing = new AtomicBoolean(false); - private final ConcurrentMap _blockingQueues = new ConcurrentHashMap(); + private final Set _blockingQueues = new ConcurrentSkipListSet(); private final AtomicBoolean _blocking = new AtomicBoolean(false); @@ -1363,7 +1362,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm public void block(AMQQueue queue) { - if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + if(_blockingQueues.add(queue)) { if(_blocking.compareAndSet(false,true)) @@ -1616,4 +1615,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } } + @Override + public int compareTo(AMQSessionModel session) + { + return getId().toString().compareTo(session.getID().toString()); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index 4c603c899a..a80eb46cfa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -20,11 +20,19 @@ */ package org.apache.qpid.server.protocol; +import java.util.concurrent.ConcurrentSkipListSet; + import org.apache.qpid.AMQException; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.SimpleAMQQueue; -public interface AMQSessionModel +/** + * Session model interface. + * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet} + * when monitoring the blocking and blocking of queues/sessions in {@link SimpleAMQQueue}. + */ +public interface AMQSessionModel extends Comparable { public Object getID(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index c4df3dd092..c6d634fb28 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -18,8 +18,23 @@ */ package org.apache.qpid.server.queue; -import org.apache.log4j.Logger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.management.JMException; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; @@ -52,22 +67,6 @@ import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; -import javax.management.JMException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); @@ -165,7 +164,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private AtomicInteger _deliveredMessages = new AtomicInteger(); private AtomicBoolean _stopped = new AtomicBoolean(false); - private final ConcurrentMap _blockedChannels = new ConcurrentHashMap(); + private final Set _blockedChannels = new ConcurrentSkipListSet(); private final AtomicBoolean _deleted = new AtomicBoolean(false); private final List _deleteTaskList = new CopyOnWriteArrayList(); @@ -1629,7 +1628,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes //Overfull log message _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity)); - _blockedChannels.putIfAbsent(channel, Boolean.TRUE); + _blockedChannels.add(channel); channel.block(this); @@ -1662,11 +1661,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); } - - for(AMQSessionModel c : _blockedChannels.keySet()) + for(final AMQSessionModel blockedChannel : _blockedChannels) { - c.unblock(this); - _blockedChannels.remove(c); + blockedChannel.unblock(this); + _blockedChannels.remove(blockedChannel); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 5a208aaeaf..e4268ed2dc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -20,8 +20,28 @@ */ package org.apache.qpid.server.transport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; +import static org.apache.qpid.util.Serial.gt; + +import java.security.Principal; +import java.text.MessageFormat; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.security.auth.Subject; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -65,27 +85,8 @@ import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.RangeSetFactory; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; - -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; -import static org.apache.qpid.util.Serial.gt; - -import javax.security.auth.Subject; -import java.security.Principal; -import java.text.MessageFormat; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, @@ -102,7 +103,7 @@ public class ServerSession extends Session private long _createTime = System.currentTimeMillis(); private LogActor _actor = GenericActor.getInstance(this); - private final ConcurrentMap _blockingQueues = new ConcurrentHashMap(); + private final Set _blockingQueues = new ConcurrentSkipListSet(); private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; @@ -684,7 +685,8 @@ public class ServerSession extends Session public void block(AMQQueue queue) { - if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + + if(_blockingQueues.add(queue)) { if(_blocking.compareAndSet(false,true)) @@ -897,4 +899,10 @@ public class ServerSession extends Session return _future.isComplete(); } } + + @Override + public int compareTo(AMQSessionModel session) + { + return getId().toString().compareTo(session.getID().toString()); + } } -- cgit v1.2.1