summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-02-09 17:34:24 +0000
committerKeith Wall <kwall@apache.org>2012-02-09 17:34:24 +0000
commit1c21d7c23b4a5e54d824e4861dd7aca511340b66 (patch)
treeb7baf8cc7deeb324e45e9ed179ff6e379a154f6e
parent866664af8c5638d766e4b98af3aa89e8dbfe04df (diff)
downloadqpid-python-1c21d7c23b4a5e54d824e4861dd7aca511340b66.tar.gz
QPID-3823: ServerSession unblock(AMQQueue) can cause NPE when trying to remove a queue from _blockingQueues Map that is not present
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Philip Harvey <phil@philharveyonline.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1242408 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java40
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java58
4 files changed, 85 insertions, 67 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 46657e2a9a..2266de0ae4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -20,8 +20,22 @@
*/
package org.apache.qpid.server;
-import org.apache.log4j.Logger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQMethodBody;
@@ -79,21 +93,6 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -155,7 +154,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
private final AMQProtocolSession _session;
private AtomicBoolean _closing = new AtomicBoolean(false);
- private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
+ private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
private final AtomicBoolean _blocking = new AtomicBoolean(false);
@@ -1363,7 +1362,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
public void block(AMQQueue queue)
{
- if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+ if(_blockingQueues.add(queue))
{
if(_blocking.compareAndSet(false,true))
@@ -1616,4 +1615,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
}
+ @Override
+ public int compareTo(AMQSessionModel session)
+ {
+ return getId().toString().compareTo(session.getID().toString());
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index 4c603c899a..a80eb46cfa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -20,11 +20,19 @@
*/
package org.apache.qpid.server.protocol;
+import java.util.concurrent.ConcurrentSkipListSet;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
-public interface AMQSessionModel
+/**
+ * Session model interface.
+ * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet}
+ * when monitoring the blocking and blocking of queues/sessions in {@link SimpleAMQQueue}.
+ */
+public interface AMQSessionModel extends Comparable<AMQSessionModel>
{
public Object getID();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index c4df3dd092..c6d634fb28 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -18,8 +18,23 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.log4j.Logger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
@@ -52,22 +67,6 @@ import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import javax.management.JMException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -165,7 +164,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
- private final ConcurrentMap<AMQSessionModel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQSessionModel, Boolean>();
+ private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
@@ -1629,7 +1628,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
//Overfull log message
_logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
- _blockedChannels.putIfAbsent(channel, Boolean.TRUE);
+ _blockedChannels.add(channel);
channel.block(this);
@@ -1662,11 +1661,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
}
-
- for(AMQSessionModel c : _blockedChannels.keySet())
+ for(final AMQSessionModel blockedChannel : _blockedChannels)
{
- c.unblock(this);
- _blockedChannels.remove(c);
+ blockedChannel.unblock(this);
+ _blockedChannels.remove(blockedChannel);
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 5a208aaeaf..e4268ed2dc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -20,8 +20,28 @@
*/
package org.apache.qpid.server.transport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.util.Serial.gt;
+
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -65,27 +85,8 @@ import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
-
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-import static org.apache.qpid.util.Serial.gt;
-
-import javax.security.auth.Subject;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ServerSession extends Session
implements AuthorizationHolder, SessionConfig,
@@ -102,7 +103,7 @@ public class ServerSession extends Session
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
- private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
+ private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
@@ -684,7 +685,8 @@ public class ServerSession extends Session
public void block(AMQQueue queue)
{
- if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+
+ if(_blockingQueues.add(queue))
{
if(_blocking.compareAndSet(false,true))
@@ -897,4 +899,10 @@ public class ServerSession extends Session
return _future.isComplete();
}
}
+
+ @Override
+ public int compareTo(AMQSessionModel session)
+ {
+ return getId().toString().compareTo(session.getID().toString());
+ }
}