summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java58
1 files changed, 33 insertions, 25 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 5a208aaeaf..e4268ed2dc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -20,8 +20,28 @@
*/
package org.apache.qpid.server.transport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.util.Serial.gt;
+
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -65,27 +85,8 @@ import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
-
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-import static org.apache.qpid.util.Serial.gt;
-
-import javax.security.auth.Subject;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ServerSession extends Session
implements AuthorizationHolder, SessionConfig,
@@ -102,7 +103,7 @@ public class ServerSession extends Session
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
- private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
+ private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
@@ -684,7 +685,8 @@ public class ServerSession extends Session
public void block(AMQQueue queue)
{
- if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+
+ if(_blockingQueues.add(queue))
{
if(_blocking.compareAndSet(false,true))
@@ -897,4 +899,10 @@ public class ServerSession extends Session
return _future.isComplete();
}
}
+
+ @Override
+ public int compareTo(AMQSessionModel session)
+ {
+ return getId().toString().compareTo(session.getID().toString());
+ }
}