diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java | 58 |
1 files changed, 33 insertions, 25 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 5a208aaeaf..e4268ed2dc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -20,8 +20,28 @@ */ package org.apache.qpid.server.transport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; +import static org.apache.qpid.util.Serial.gt; + +import java.security.Principal; +import java.text.MessageFormat; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.security.auth.Subject; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -65,27 +85,8 @@ import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.RangeSetFactory; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; - -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; -import static org.apache.qpid.util.Serial.gt; - -import javax.security.auth.Subject; -import java.security.Principal; -import java.text.MessageFormat; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, @@ -102,7 +103,7 @@ public class ServerSession extends Session private long _createTime = System.currentTimeMillis(); private LogActor _actor = GenericActor.getInstance(this); - private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); + private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>(); private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; @@ -684,7 +685,8 @@ public class ServerSession extends Session public void block(AMQQueue queue) { - if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + + if(_blockingQueues.add(queue)) { if(_blocking.compareAndSet(false,true)) @@ -897,4 +899,10 @@ public class ServerSession extends Session return _future.isComplete(); } } + + @Override + public int compareTo(AMQSessionModel session) + { + return getId().toString().compareTo(session.getID().toString()); + } } |