summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java14
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java188
2 files changed, 186 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 4c65a4682a..4267642b14 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -227,7 +227,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return;
}
}
-
+
try
@@ -356,7 +356,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
catch (Exception e)
{
-
+
for (AMQMethodListener listener : _frameListeners)
{
listener.error(e);
@@ -548,7 +548,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void closeChannelOk(int channelId)
{
- removeChannel(channelId);
+ // todo QPID-847 - This is called from two lcoations ChannelCloseHandler and ChannelCloseOkHandler.
+ // When it is the CC_OK_Handler then it makes sence to remove the channel else we will leak memory.
+ // We do it from the Close Handler as we are sending the OK back to the client.
+ // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException
+ // will send a close-ok.. Where we should call removeChannel.
+ // However, due to the poor exception handling on the client. The client-user will be notified of the
+ // InvalidArgument and if they then decide to close the session/connection then the there will be time
+ // for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed.
+ //removeChannel(channelId);
_closingChannelsList.remove(new Integer(channelId));
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
index e0bfc5d498..d05ed7ca73 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
@@ -21,18 +21,20 @@
package org.apache.qpid.test.unit.basic;
import junit.framework.TestCase;
-
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.client.transport.TransportConnection;
-
+import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -46,12 +48,12 @@ public class SelectorTest extends TestCase implements MessageListener
private AMQSession _session;
private int count;
public String _connectionString = "vm://:1";
+ private static final String INVALID_SELECTOR = "Cost LIKE 5";
protected void setUp() throws Exception
{
super.setUp();
TransportConnection.createVMBroker(1);
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
protected void tearDown() throws Exception
@@ -60,19 +62,19 @@ public class SelectorTest extends TestCase implements MessageListener
TransportConnection.killAllVMBrokers();
}
- private void init(AMQConnection connection) throws Exception
+ private void init(AMQConnection connection) throws JMSException
{
init(connection, new AMQQueue(connection, randomize("SessionStartTest"), true));
}
- private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ private void init(AMQConnection connection, AMQDestination destination) throws JMSException
{
_connection = connection;
_destination = destination;
connection.start();
String selector = null;
- selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
+ selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
// selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
_session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
@@ -80,14 +82,17 @@ public class SelectorTest extends TestCase implements MessageListener
_session.createConsumer(destination, selector).setMessageListener(this);
}
- public synchronized void test() throws JMSException, InterruptedException
+ public synchronized void test() throws JMSException, InterruptedException, URLSyntaxException, AMQException
{
try
{
+
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
+
Message msg = _session.createTextMessage("Message");
msg.setJMSPriority(1);
msg.setIntProperty("Cost", 2);
- msg.setStringProperty("property-with-hyphen","wibble");
+ msg.setStringProperty("property-with-hyphen", "wibble");
msg.setJMSType("Special");
_logger.info("Sending Message:" + msg);
@@ -107,10 +112,147 @@ public class SelectorTest extends TestCase implements MessageListener
// throw new RuntimeException("Did not get message!");
}
}
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ System.out.println("SUCCESS!!");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ }
+ catch (URLSyntaxException e)
+ {
+ _logger.debug("URL:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ fail("Wrong exception");
+ }
+ catch (AMQException e)
+ {
+ _logger.debug("AMQ:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ fail("Wrong exception");
+ }
+
+ finally
+ {
+ if (_session != null)
+ {
+ _session.close();
+ }
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ }
+
+
+ public void testInvalidSelectors()
+ {
+ Connection connection = null;
+
+ try
+ {
+ connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test");
+ _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ }
+ catch (JMSException e)
+ {
+ fail(e.getMessage());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ catch (URLSyntaxException e)
+ {
+ fail("Error:" + e.getMessage());
+ }
+
+ //Try Creating a Browser
+ try
+ {
+ _session.createBrowser(_session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ _logger.debug("SUCCESS!!");
+ }
+ }
+
+ //Try Creating a Consumer
+ try
+ {
+ _session.createConsumer(_session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ _logger.debug("SUCCESS!!");
+ }
+ }
+
+ //Try Creating a Receiever
+ try
+ {
+ _session.createReceiver(_session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ _logger.debug("SUCCESS!!");
+ }
+ }
+
finally
{
- _session.close();
- _connection.close();
+ if (_session != null)
+ {
+ try
+ {
+ _session.close();
+ }
+ catch (JMSException e)
+ {
+ fail("Error cleaning up:" + e.getMessage());
+ }
+ }
+ if (_connection != null)
+ {
+ try
+ {
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ fail("Error cleaning up:" + e.getMessage());
+ }
+ }
}
}
@@ -129,9 +271,29 @@ public class SelectorTest extends TestCase implements MessageListener
public static void main(String[] argv) throws Exception
{
SelectorTest test = new SelectorTest();
- test._connectionString = (argv.length == 0) ? "localhost:5672" : argv[0];
- test.setUp();
- test.test();
+ test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0];
+
+ try
+ {
+ while (true)
+ {
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.setUp();
+ }
+ test.test();
+
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.tearDown();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ }
}
public static junit.framework.Test suite()