summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java239
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java394
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java81
3 files changed, 714 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
new file mode 100644
index 0000000000..79e2ff8148
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import junit.textui.TestRunner;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Due to bizarre exception handling all sessions are closed if you get
+ * a channel close request and no exception listener is registered.
+ * <p/>
+ * JIRA issue IBTBLZ-10.
+ * <p/>
+ * Simulate by:
+ * <p/>
+ * 0. Create two sessions with no exception listener.
+ * 1. Publish message to queue/topic that does not exist (wrong routing key).
+ * 2. This will cause a channel close.
+ * 3. Since client does not have an exception listener, currently all sessions are
+ * closed.
+ */
+public class ChannelCloseOkTest extends QpidBrokerTestCase
+{
+ private AMQConnection _connection;
+ private Destination _destination1;
+ private Destination _destination2;
+ private Session _session1;
+ private Session _session2;
+ private final List<Message> _received1 = new ArrayList<Message>();
+ private final List<Message> _received2 = new ArrayList<Message>();
+
+ private static final Logger _log = LoggerFactory.getLogger(ChannelCloseOkTest.class);
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _connection = (AMQConnection) getConnection("guest", "guest");
+
+ _destination1 = new AMQQueue(_connection, "q1", true);
+ _destination2 = new AMQQueue(_connection, "q2", true);
+ _session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _session1.createConsumer(_destination1).setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _log.debug("consumer 1 got message [" + getTextMessage(message) + "]");
+ synchronized (_received1)
+ {
+ _received1.add(message);
+ _received1.notify();
+ }
+ }
+ });
+ _session2 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _session2.createConsumer(_destination2).setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _log.debug("consumer 2 got message [" + getTextMessage(message) + "]");
+ synchronized (_received2)
+ {
+ _received2.add(message);
+ _received2.notify();
+ }
+ }
+ });
+
+ _connection.start();
+ }
+
+ private String getTextMessage(Message message)
+ {
+ TextMessage tm = (TextMessage) message;
+ try
+ {
+ return tm.getText();
+ }
+ catch (JMSException e)
+ {
+ return "oops " + e;
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ closeConnection();
+ super.tearDown();
+ }
+
+ public void closeConnection() throws JMSException
+ {
+ if (_connection != null)
+ {
+ _log.info(">>>>>>>>>>>>>>.. closing");
+ _connection.close();
+ }
+ }
+
+ public void testWithoutExceptionListener() throws Exception
+ {
+ doTest();
+ }
+
+ public void testWithExceptionListener() throws Exception
+ {
+ _connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmsException)
+ {
+ _log.warn("onException - " + jmsException.getMessage());
+ }
+ });
+
+ doTest();
+ }
+
+ public void doTest() throws Exception
+ {
+ // Check both sessions are ok.
+ sendAndWait(_session1, _destination1, "first", _received1, 1);
+ sendAndWait(_session2, _destination2, "second", _received2, 1);
+ assertEquals(1, _received1.size());
+ assertEquals(1, _received2.size());
+
+ // Now send message to incorrect destination on session 1.
+ Destination destination = new AMQQueue(_connection, "incorrect");
+ send(_session1, destination, "third"); // no point waiting as message will never be received.
+
+ // Ensure both sessions are still ok.
+ // Send a bunch of messages as this give time for the sessions to be erroneously closed.
+ final int num = 300;
+ for (int i = 0; i < num; ++i)
+ {
+ send(_session1, _destination1, "" + i);
+ send(_session2, _destination2, "" + i);
+ }
+
+ waitFor(_received1, num + 1);
+ waitFor(_received2, num + 1);
+
+ // Note that the third message is never received as it is sent to an incorrect destination.
+ assertEquals(num + 1, _received1.size());
+ assertEquals(num + 1, _received2.size());
+ }
+
+ private void sendAndWait(Session session, Destination destination, String message, List<Message> received, int count)
+ throws JMSException, InterruptedException
+ {
+ send(session, destination, message);
+ waitFor(received, count);
+ }
+
+ private void send(Session session, Destination destination, String message) throws JMSException
+ {
+ _log.debug("sending message " + message);
+ MessageProducer producer1 = session.createProducer(destination);
+ producer1.send(session.createTextMessage(message));
+ }
+
+ private void waitFor(List<Message> received, int count) throws InterruptedException
+ {
+ long timeout = 20000;
+
+ synchronized (received)
+ {
+ long start = System.currentTimeMillis();
+ while (received.size() < count)
+ {
+ if (System.currentTimeMillis() - start > timeout)
+ {
+ fail("timeout expired waiting for messages");
+ }
+ try
+ {
+ received.wait(timeout);
+ }
+ catch (InterruptedException e)
+ {
+ _log.info("Interrupted: " + e);
+ throw e;
+ }
+
+ }
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] args)
+ {
+ TestRunner.run(ChannelCloseOkTest.class);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(ChannelCloseOkTest.class);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
new file mode 100644
index 0000000000..b6232b1734
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.protocol.AMQConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class ChannelCloseTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseTest.class);
+
+ Connection _connection;
+ private Session _session;
+ private static final long SYNC_TIMEOUT = 500;
+ private int TEST = 0;
+
+ /**
+ * Close channel, use chanel with same id ensure error.
+ *
+ * This test is only valid for non 0-10 connection .
+ */
+ public void testReusingChannelAfterFullClosure() throws Exception
+ {
+ _connection=newConnection();
+
+ // Create Producer
+ try
+ {
+ _connection.start();
+
+ createChannelAndTest(1);
+
+ // Cause it to close
+ try
+ {
+ _logger.info("Testing invalid exchange");
+ declareExchange(1, "", "name_that_will_lookup_to_null", false);
+ fail("Exchange name is empty so this should fail ");
+ }
+ catch (AMQException e)
+ {
+ assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
+ }
+
+ // Check that
+ try
+ {
+ _logger.info("Testing valid exchange should fail");
+ declareExchange(1, "topic", "amq.topic", false);
+ fail("This should not succeed as the channel should be closed ");
+ }
+ catch (AMQException e)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Exception occured was:" + e.getErrorCode());
+ }
+
+ assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
+
+ _connection=newConnection();
+ }
+
+ checkSendingMessage();
+
+ _session.close();
+ _connection.close();
+
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /*
+ close channel and send guff then send ok no errors
+ REMOVE TEST - The behaviour after server has sent close is undefined.
+ the server should be free to fail as it may wish to reclaim its resources
+ immediately after close.
+ */
+ /*public void testSendingMethodsAfterClose() throws Exception
+ {
+ // this is testing an 0.8 connection
+ if(isBroker08())
+ {
+ try
+ {
+ _connection=new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'");
+
+ ((AMQConnection) _connection).setConnectionListener(this);
+
+ _connection.setExceptionListener(this);
+
+ // Change the StateManager for one that doesn't respond with Close-OKs
+ AMQStateManager oldStateManager=((AMQConnection) _connection).getProtocolHandler().getStateManager();
+
+ _session=_connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ _connection.start();
+
+ // Test connection
+ checkSendingMessage();
+
+ // Set StateManager to manager that ignores Close-oks
+ AMQProtocolSession protocolSession=
+ ((AMQConnection) _connection).getProtocolHandler().getProtocolSession();
+
+ MethodDispatcher d = protocolSession.getMethodDispatcher();
+
+ MethodDispatcher wrappedDispatcher = (MethodDispatcher)
+ Proxy.newProxyInstance(d.getClass().getClassLoader(),
+ d.getClass().getInterfaces(),
+ new MethodDispatcherProxyHandler(
+ (ClientMethodDispatcherImpl) d));
+
+ protocolSession.setMethodDispatcher(wrappedDispatcher);
+
+
+ AMQStateManager newStateManager=new NoCloseOKStateManager(protocolSession);
+ newStateManager.changeState(oldStateManager.getCurrentState());
+
+ ((AMQConnection) _connection).getProtocolHandler().setStateManager(newStateManager);
+
+ final int TEST_CHANNEL=1;
+ _logger.info("Testing Channel(" + TEST_CHANNEL + ") Creation");
+
+ createChannelAndTest(TEST_CHANNEL);
+
+ // Cause it to close
+ try
+ {
+ _logger.info("Closing Channel - invalid exchange");
+ declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false);
+ fail("Exchange name is empty so this should fail ");
+ }
+ catch (AMQException e)
+ {
+ assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
+ }
+
+ try
+ {
+ // Send other methods that should be ignored
+ // send them no wait as server will ignore them
+ _logger.info("Tested known exchange - should ignore");
+ declareExchange(TEST_CHANNEL, "topic", "amq.topic", true);
+
+ _logger.info("Tested known invalid exchange - should ignore");
+ declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true);
+
+ _logger.info("Tested known invalid exchange - should ignore");
+ declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true);
+
+ // Send sync .. server will igore and timy oue
+ _logger.info("Tested known invalid exchange - should ignore");
+ declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false);
+ }
+ catch (AMQTimeoutException te)
+ {
+ assertEquals("Request should timeout", AMQConstant.REQUEST_TIMEOUT, te.getErrorCode());
+ }
+ catch (AMQException e)
+ {
+ fail("This should not fail as all requests should be ignored");
+ }
+
+ _logger.info("Sending Close");
+ // Send Close-ok
+ sendClose(TEST_CHANNEL);
+
+ _logger.info("Re-opening channel");
+
+ createChannelAndTest(TEST_CHANNEL);
+
+ // Test connection is still ok
+
+ checkSendingMessage();
+
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+
+ }
+ catch (URLSyntaxException e)
+ {
+ fail(e.getMessage());
+ }
+ finally
+ {
+ try
+ {
+ _session.close();
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ }
+ }
+*/
+ private void createChannelAndTest(int channel) throws FailoverException
+ {
+ // Create A channel
+ try
+ {
+ createChannel(channel);
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+
+ // Test it is ok
+ try
+ {
+ declareExchange(channel, "topic", "amq.topic", false);
+ _logger.info("Tested known exchange");
+ }
+ catch (AMQException e)
+ {
+ fail("This should not fail as this is the default exchange details");
+ }
+ }
+
+ private void sendClose(int channel)
+ {
+ ChannelCloseOkBody body =
+ ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelCloseOkBody();
+ AMQFrame frame = body.generateFrame(channel);
+
+ ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame);
+ }
+
+ private void checkSendingMessage() throws JMSException
+ {
+ TEST++;
+ _logger.info("Test creating producer which will use channel id 1");
+
+ Queue queue = _session.createQueue("CCT_test_validation_queue" + TEST);
+
+ MessageConsumer consumer = _session.createConsumer(queue);
+
+ MessageProducer producer = _session.createProducer(queue);
+
+ final String MESSAGE = "CCT_Test_Message";
+ producer.send(_session.createTextMessage(MESSAGE));
+
+ Message msg = consumer.receive(2000);
+
+ assertNotNull("Received messages should not be null.", msg);
+ assertEquals("Message received not what we sent", MESSAGE, ((TextMessage) msg).getText());
+ }
+
+ private Connection newConnection()
+ {
+ Connection connection = null;
+ try
+ {
+ connection = getConnection();
+
+ ((AMQConnection) connection).setConnectionListener(this);
+
+ _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ connection.start();
+
+ }
+ catch (Exception e)
+ {
+ fail("Creating new connection when:" + e.getMessage());
+ }
+
+ return connection;
+ }
+
+ private void declareExchange(int channelId, String _type, String _name, boolean nowait)
+ throws AMQException, FailoverException
+ {
+ ExchangeDeclareBody body =
+ ((AMQConnection) _connection).getProtocolHandler()
+ .getMethodRegistry()
+ .createExchangeDeclareBody(0,
+ new AMQShortString(_name),
+ new AMQShortString(_type),
+ true,
+ false,
+ false,
+ false,
+ nowait,
+ null);
+ AMQFrame exchangeDeclare = body.generateFrame(channelId);
+ AMQProtocolHandler protocolHandler = ((AMQConnection) _connection).getProtocolHandler();
+
+
+ if (nowait)
+ {
+ protocolHandler.writeFrame(exchangeDeclare);
+ }
+ else
+ {
+ protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT);
+ }
+
+// return null;
+// }
+// }, (AMQConnection)_connection).execute();
+
+ }
+
+ private void createChannel(int channelId) throws AMQException, FailoverException
+ {
+ ChannelOpenBody body =
+ ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
+
+ ((AMQConnection) _connection).getProtocolHandler().syncWrite(body.generateFrame(channelId), // outOfBand
+ ChannelOpenOkBody.class);
+
+ }
+
+ public void onException(JMSException jmsException)
+ {
+ // _logger.info("CCT" + jmsException);
+ fail(jmsException.getMessage());
+ }
+
+ public void bytesSent(long count)
+ { }
+
+ public void bytesReceived(long count)
+ { }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return false;
+ }
+
+ public boolean preResubscribe()
+ {
+ return false;
+ }
+
+ public void failoverComplete()
+ { }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
new file mode 100644
index 0000000000..56d03dc4a7
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class CloseWithBlockingReceiveTest extends QpidBrokerTestCase
+{
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+
+ public void testReceiveReturnsNull() throws Exception
+ {
+ final AMQConnection connection = (AMQConnection) getConnection("guest", "guest");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(new AMQTopic(connection, "banana"));
+ connection.start();
+
+ Runnable r = new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(1000);
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ };
+ long startTime = System.currentTimeMillis();
+ new Thread(r).start();
+ consumer.receive(10000);
+ assertTrue(System.currentTimeMillis() - startTime < 10000);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(CloseWithBlockingReceiveTest.class);
+ }
+
+}