diff options
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java | 14 | ||||
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java | 188 |
2 files changed, 186 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 4c65a4682a..4267642b14 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -227,7 +227,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return; } } - + try @@ -356,7 +356,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } catch (Exception e) { - + for (AMQMethodListener listener : _frameListeners) { listener.error(e); @@ -548,7 +548,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public void closeChannelOk(int channelId) { - removeChannel(channelId); + // todo QPID-847 - This is called from two lcoations ChannelCloseHandler and ChannelCloseOkHandler. + // When it is the CC_OK_Handler then it makes sence to remove the channel else we will leak memory. + // We do it from the Close Handler as we are sending the OK back to the client. + // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException + // will send a close-ok.. Where we should call removeChannel. + // However, due to the poor exception handling on the client. The client-user will be notified of the + // InvalidArgument and if they then decide to close the session/connection then the there will be time + // for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed. + //removeChannel(channelId); _closingChannelsList.remove(new Integer(channelId)); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java index e0bfc5d498..d05ed7ca73 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java @@ -21,18 +21,20 @@ package org.apache.qpid.test.unit.basic; import junit.framework.TestCase; - +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.client.transport.TransportConnection; - +import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @@ -46,12 +48,12 @@ public class SelectorTest extends TestCase implements MessageListener private AMQSession _session; private int count; public String _connectionString = "vm://:1"; + private static final String INVALID_SELECTOR = "Cost LIKE 5"; protected void setUp() throws Exception { super.setUp(); TransportConnection.createVMBroker(1); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } protected void tearDown() throws Exception @@ -60,19 +62,19 @@ public class SelectorTest extends TestCase implements MessageListener TransportConnection.killAllVMBrokers(); } - private void init(AMQConnection connection) throws Exception + private void init(AMQConnection connection) throws JMSException { init(connection, new AMQQueue(connection, randomize("SessionStartTest"), true)); } - private void init(AMQConnection connection, AMQDestination destination) throws Exception + private void init(AMQConnection connection, AMQDestination destination) throws JMSException { _connection = connection; _destination = destination; connection.start(); String selector = null; - selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'"; + selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'"; // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); @@ -80,14 +82,17 @@ public class SelectorTest extends TestCase implements MessageListener _session.createConsumer(destination, selector).setMessageListener(this); } - public synchronized void test() throws JMSException, InterruptedException + public synchronized void test() throws JMSException, InterruptedException, URLSyntaxException, AMQException { try { + + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); + Message msg = _session.createTextMessage("Message"); msg.setJMSPriority(1); msg.setIntProperty("Cost", 2); - msg.setStringProperty("property-with-hyphen","wibble"); + msg.setStringProperty("property-with-hyphen", "wibble"); msg.setJMSType("Special"); _logger.info("Sending Message:" + msg); @@ -107,10 +112,147 @@ public class SelectorTest extends TestCase implements MessageListener // throw new RuntimeException("Did not get message!"); } } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + System.out.println("SUCCESS!!"); + } + } + catch (InterruptedException e) + { + _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage()); + } + catch (URLSyntaxException e) + { + _logger.debug("URL:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + fail("Wrong exception"); + } + catch (AMQException e) + { + _logger.debug("AMQ:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + fail("Wrong exception"); + } + + finally + { + if (_session != null) + { + _session.close(); + } + if (_connection != null) + { + _connection.close(); + } + } + } + + + public void testInvalidSelectors() + { + Connection connection = null; + + try + { + connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"); + _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + } + catch (JMSException e) + { + fail(e.getMessage()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + catch (URLSyntaxException e) + { + fail("Error:" + e.getMessage()); + } + + //Try Creating a Browser + try + { + _session.createBrowser(_session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + _logger.debug("SUCCESS!!"); + } + } + + //Try Creating a Consumer + try + { + _session.createConsumer(_session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + _logger.debug("SUCCESS!!"); + } + } + + //Try Creating a Receiever + try + { + _session.createReceiver(_session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + _logger.debug("SUCCESS!!"); + } + } + finally { - _session.close(); - _connection.close(); + if (_session != null) + { + try + { + _session.close(); + } + catch (JMSException e) + { + fail("Error cleaning up:" + e.getMessage()); + } + } + if (_connection != null) + { + try + { + _connection.close(); + } + catch (JMSException e) + { + fail("Error cleaning up:" + e.getMessage()); + } + } } } @@ -129,9 +271,29 @@ public class SelectorTest extends TestCase implements MessageListener public static void main(String[] argv) throws Exception { SelectorTest test = new SelectorTest(); - test._connectionString = (argv.length == 0) ? "localhost:5672" : argv[0]; - test.setUp(); - test.test(); + test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0]; + + try + { + while (true) + { + if (test._connectionString.contains("vm://:1")) + { + test.setUp(); + } + test.test(); + + if (test._connectionString.contains("vm://:1")) + { + test.tearDown(); + } + } + } + catch (Exception e) + { + System.err.println(e.getMessage()); + e.printStackTrace(); + } } public static junit.framework.Test suite() |