diff options
9 files changed, 167 insertions, 5 deletions
diff --git a/java/010ExcludeList b/java/010ExcludeList index ae86d46b5b..0b1993e71a 100644 --- a/java/010ExcludeList +++ b/java/010ExcludeList @@ -69,3 +69,7 @@ org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#* //QPID-1818 : 0-10 Client code path does not correctly restore a transacted session after failover. org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* + +// QPID-1823: this takes ages to run +org.apache.qpid.client.SessionCreateTest#* + diff --git a/java/08ExcludeList b/java/08ExcludeList index fae59a07d0..0dde433b10 100644 --- a/java/08ExcludeList +++ b/java/08ExcludeList @@ -10,3 +10,6 @@ org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait //QPID-1818 : Client code path does not correctly restore a transacted session after failover. org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* + +// QPID-1823: this takes ages to run +org.apache.qpid.client.SessionCreateTest#* diff --git a/java/08ExcludeList-nonvm b/java/08ExcludeList-nonvm index bce674b37c..aaec56d7b8 100644 --- a/java/08ExcludeList-nonvm +++ b/java/08ExcludeList-nonvm @@ -36,3 +36,6 @@ org.apache.qpid.test.unit.close.FlowToDiskBackingQueueDeleteTest#* org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#* //QPID-1818 : Client code path does not correctly restore a transacted session after failover. org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* +// QPID-1823: this takes ages to run +org.apache.qpid.client.SessionCreateTest#* + diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 39acee3a60..e6968100f3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -90,6 +90,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); private int _size = 0; private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private AtomicInteger _idFactory = new AtomicInteger(0); + private int _maxChannelID; + private boolean _cycledIds; public AMQSession get(int channelId) { @@ -179,11 +182,57 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _fastAccessSessions[i] = null; } } + + /* + * Synchronized on whole method so that we don't need to consider the + * increment-then-reset path in too much detail + */ + public synchronized int getNextChannelId() + { + int id = 0; + if (!_cycledIds) + { + id = _idFactory.incrementAndGet(); + if (id == _maxChannelID) + { + _cycledIds = true; + _idFactory.set(0); // Go back to the start + } + } + else + { + boolean done = false; + while (!done) + { + // Needs to work second time through + id = _idFactory.incrementAndGet(); + if (id > _maxChannelID) + { + _idFactory.set(0); + id = _idFactory.incrementAndGet(); + } + if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) + { + done = (_fastAccessSessions[id] == null); + } + else + { + done = (!_slowAccessSessions.keySet().contains(id)); + } + } + } + + return id; + } + + public void setMaxChannelID(int maxChannelID) + { + _maxChannelID = maxChannelID; + } } private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); - protected AtomicInteger _idFactory = new AtomicInteger(0); /** * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be @@ -415,6 +464,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _delegate = new AMQConnectionDelegate_0_10(this); } + _sessions.setMaxChannelID(_delegate.getMaxChannelID()); if (_logger.isInfoEnabled()) { @@ -567,6 +617,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); + _sessions.setMaxChannelID(_delegate.getMaxChannelID()); } catch (ClassNotFoundException e) { @@ -1395,7 +1446,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _sessions.put(channelId, session); } - void deregisterSession(int channelId) + public void deregisterSession(int channelId) { _sessions.remove(channelId); } @@ -1540,4 +1591,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _delegate.setIdleTimeout(l); } + + public int getNextChannelID() + { + return _sessions.getNextChannelId(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index cec840f5c6..e5980d8b7d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -50,4 +50,6 @@ public interface AMQConnectionDelegate <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E; void setIdleTimeout(long l); + + int getMaxChannelID(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index c2fb05d94e..927929c94a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -79,7 +79,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec throws JMSException { _conn.checkNotClosed(); - int channelId = _conn._idFactory.incrementAndGet(); + int channelId = _conn.getNextChannelID(); AMQSession session; try { @@ -105,7 +105,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException { _conn.checkNotClosed(); - int channelId = _conn._idFactory.incrementAndGet(); + int channelId = _conn.getNextChannelID(); XASessionImpl session; try { @@ -284,4 +284,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { _qpidConnection.setIdleTimeout(l); } + + @Override + public int getMaxChannelID() + { + return Integer.MAX_VALUE; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 17090875a7..a0b69b5493 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -138,7 +138,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException { - int channelId = _conn._idFactory.incrementAndGet(); + int channelId = _conn.getNextChannelID(); if (_logger.isDebugEnabled()) { @@ -289,4 +289,10 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } public void setIdleTimeout(long l){} + + @Override + public int getMaxChannelID() + { + return (int) (Math.pow(2, 16)-1); + } } diff --git a/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java b/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java new file mode 100644 index 0000000000..1672c2a828 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.naming.Context; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Class to check that session creation on a connection has no accidental limit + */ +public class SessionCreateTest extends QpidTestCase +{ + private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class); + + Context _context; + + private Connection _clientConnection; + protected int maxSessions = 65555; + + public void testSessionCreationLimit() throws Exception + { + // Create Client + _clientConnection = getConnection("guest", "guest"); + + _clientConnection.start(); + + for (int i=0; i < maxSessions; i++) + { + Session sess = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(sess); + sess.close(); + System.out.println("created session: " + i); + } + + _clientConnection.close(); + + } + +}
\ No newline at end of file diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java index 55750dcafb..cbeb16f340 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -30,6 +30,7 @@ import javax.jms.TextMessage; import javax.jms.TopicSession; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionDelegate_0_10; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; @@ -244,6 +245,24 @@ public class AMQConnectionTest extends QpidTestCase } } + public void testGetChannelID() + { + int maxChannelID = 65536; + if (isBroker010()) + { + maxChannelID = Integer.MAX_VALUE+1; + } + for (int j = 0; j < 3; j++) + { + for (int i = 1; i < maxChannelID; i++) + { + int id = _connection.getNextChannelID(); + assertEquals("On iterartion "+j, i, id); + _connection.deregisterSession(id); + } + } + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(AMQConnectionTest.class); |