diff options
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java')
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java | 329 |
1 files changed, 86 insertions, 243 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java b/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java index a0bb31192f..636fb714e0 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java @@ -55,31 +55,20 @@ public class ResetMessageListenerTest extends QpidTestCase Context _context; private static final int MSG_COUNT = 6; - private int receivedCount1ML1 = 0; - private int receivedCount1ML2 = 0; - private int receivedCount2 = 0; private Connection _clientConnection, _producerConnection; private MessageConsumer _consumer1; - private MessageConsumer _consumer2; MessageProducer _producer; Session _clientSession, _producerSession; - private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(2); // all messages Sent Lock - private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(2); // all messages Sent Lock - private final CountDownLatch _allFirstMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock - private final CountDownLatch _allSecondMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock - - private String oldImmediatePrefetch; + private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(MSG_COUNT); // all messages Sent Lock + private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(MSG_COUNT); // all messages Sent Lock protected void setUp() throws Exception { super.setUp(); - oldImmediatePrefetch = System.getProperty(AMQSession.IMMEDIATE_PREFETCH); - System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true"); - _clientConnection = getConnection("guest", "guest"); - + _clientConnection.start(); // Create Client 1 _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -88,9 +77,6 @@ public class ResetMessageListenerTest extends QpidTestCase _consumer1 = _clientSession.createConsumer(queue); - // Create Client 2 on same session - _consumer2 = _clientSession.createConsumer(queue); - // Create Producer _producerConnection = getConnection("guest", "guest"); @@ -107,7 +93,6 @@ public class ResetMessageListenerTest extends QpidTestCase m.setText("Message " + msg); _producer.send(m); } - } protected void tearDown() throws Exception @@ -115,268 +100,126 @@ public class ResetMessageListenerTest extends QpidTestCase _clientConnection.close(); super.tearDown(); - if (oldImmediatePrefetch == null) - { - oldImmediatePrefetch = AMQSession.IMMEDIATE_PREFETCH_DEFAULT; - } - System.setProperty(AMQSession.IMMEDIATE_PREFETCH, oldImmediatePrefetch); } public void testAsynchronousRecieve() { _logger.info("Test Start"); - if (isBroker08()) + + try { - // Set default Message Listener - try + _consumer1.setMessageListener(new MessageListener() { - _consumer1.setMessageListener(new MessageListener() + public void onMessage(Message message) { - public void onMessage(Message message) + try { - _logger.info("Client 1 ML 1 Received Message(" + receivedCount1ML1 + "):" + message); - - receivedCount1ML1++; - if (receivedCount1ML1 == (MSG_COUNT / 2)) + if (message.getStringProperty("rank").equals("first")) { _allFirstMessagesSent.countDown(); } } - }); - } - catch (JMSException e) - { - _logger.error("Error Setting Default ML on consumer1"); - } - - try - { - _consumer2.setMessageListener(new MessageListener() - { - public void onMessage(Message message) + catch (JMSException e) { - _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message); - - receivedCount2++; - if (receivedCount2 == (MSG_COUNT / 2)) - { - _logger.info("Client 2 received all its messages1"); - _allFirstMessagesSent.countDown(); - } - - if (receivedCount2 == MSG_COUNT) - { - _logger.info("Client 2 received all its messages2"); - _allSecondMessagesSent.countDown(); - } + e.printStackTrace(); + fail("error receiving message"); } - }); - - _clientConnection.start(); - } - catch (JMSException e) - { - _logger.error("Error Setting Default ML on consumer2"); - - } - - try - { - _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS); - _logger.info("Received first batch of messages"); - } - catch (InterruptedException e) - { - // do nothing - } - - try - { - _clientConnection.stop(); - } - catch (JMSException e) - { - _logger.error("Error stopping connection"); - } - - _logger.info("Reset Message Listener to better listener while connection stopped, will restart session"); - try - { - _consumer1.setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - _logger.info("Client 1 ML2 Received Message(" + receivedCount1ML1 + "):" + message); - - receivedCount1ML2++; - if (receivedCount1ML2 == (MSG_COUNT / 2)) - { - _allSecondMessagesSent.countDown(); - } - } - }); - - _clientConnection.start(); - } - catch (javax.jms.IllegalStateException e) - { - _logger.error("Connection not stopped while setting ML", e); - fail("Unable to change message listener:" + e.getCause()); - } - catch (JMSException e) - { - _logger.error("Error Setting Better ML on consumer1", e); - } - - try - { - _logger.info("Send additional messages"); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - _producer.send(_producerSession.createTextMessage("Message " + msg)); } - } - catch (JMSException e) - { - _logger.error("Unable to send additional messages", e); - } - - _logger.info("Waiting upto 2 seconds for messages"); + }); + } + catch (JMSException e) + { + _logger.error("Error Setting Default ML on consumer1"); + } + try + { + assertTrue("Did not receive all first batch of messages", + _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS)); + _logger.info("Received first batch of messages"); + } + catch (InterruptedException e) + { + // do nothing + } - try - { - _allSecondMessagesSent.await(5000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - // do nothing - } - assertEquals("First batch of messages not received correctly", 0, _allFirstMessagesSent.getCount()); - assertEquals("Second batch of messages not received correctly", 0, _allSecondMessagesSent.getCount()); - assertEquals("Client 1 ML1 didn't get all messages", MSG_COUNT / 2, receivedCount1ML1); - assertEquals("Client 2 didn't get all messages", MSG_COUNT, receivedCount2); - assertEquals("Client 1 ML2 didn't get all messages", MSG_COUNT / 2, receivedCount1ML2); + try + { + _clientConnection.stop(); + } + catch (JMSException e) + { + _logger.error("Error stopping connection"); } - else + + _logger.info("Reset Message Listener "); + try { - try + _consumer1.setMessageListener(new MessageListener() { - _consumer2.close(); - _consumer1.setMessageListener(new MessageListener() + public void onMessage(Message message) { - public void onMessage(Message message) + try { - _logger.info("Received Message(" + receivedCount1ML1 + "):" + message); - - try + if (message.getStringProperty("rank").equals("first")) { - if (message.getStringProperty("rank").equals("first")) - { - _allFirstMessagesSent010.countDown(); - } + // Something ugly will happen, it'll probably kill the dispatcher + fail("All first set of messages should have been received"); } - catch (JMSException e) + else { - e.printStackTrace(); - fail("error receiving message"); + _allSecondMessagesSent.countDown(); } } - }); - } - catch (JMSException e) - { - _logger.error("Error Setting Default ML on consumer1"); - } - try - { - _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS); - _logger.info("Received first batch of messages"); - } - catch (InterruptedException e) - { - // do nothing - } - - try - { - _clientConnection.stop(); - } - catch (JMSException e) - { - _logger.error("Error stopping connection"); - } - - _logger.info("Reset Message Listener "); - try - { - _consumer1.setMessageListener(new MessageListener() - { - public void onMessage(Message message) + catch (JMSException e) { - _logger.info("Received Message(" + receivedCount1ML1 + "):" + message); - - try - { - if (message.getStringProperty("rank").equals("first")) - { - _allFirstMessagesSent010.countDown(); - } - else - { - _allSecondMessagesSent010.countDown(); - } - } - catch (JMSException e) - { - e.printStackTrace(); - fail("error receiving message"); - } + e.printStackTrace(); + // Something ugly will happen, it'll probably kill the dispatcher + fail("error receiving message"); } - }); + } + }); - _clientConnection.start(); - } - catch (javax.jms.IllegalStateException e) - { - _logger.error("Connection not stopped while setting ML", e); - fail("Unable to change message listener:" + e.getCause()); - } - catch (JMSException e) - { - _logger.error("Error Setting Better ML on consumer1", e); - } + _clientConnection.start(); + } + catch (javax.jms.IllegalStateException e) + { + _logger.error("Connection not stopped while setting ML", e); + fail("Unable to change message listener:" + e.getCause()); + } + catch (JMSException e) + { + _logger.error("Error Setting Better ML on consumer1", e); + } - try - { - _logger.info("Send additional messages"); - TextMessage m = _producerSession.createTextMessage(); - m.setStringProperty("rank", "second"); - for (int msg = 0; msg < MSG_COUNT; msg++) - { - m.setText("Message " + msg); - _producer.send(m); - } - } - catch (JMSException e) + try + { + _logger.info("Send additional messages"); + TextMessage m = _producerSession.createTextMessage(); + m.setStringProperty("rank", "second"); + for (int msg = 0; msg < MSG_COUNT; msg++) { - _logger.error("Unable to send additional messages", e); + m.setText("Message " + msg); + _producer.send(m); } + } + catch (JMSException e) + { + _logger.error("Unable to send additional messages", e); + } - _logger.info("Waiting upto 2 seconds for messages"); + _logger.info("Waiting for messages"); - try - { - _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - // do nothing - } - assertEquals("First batch of messages not received correctly", 0, _allFirstMessagesSent010.getCount()); - assertEquals("Second batch of messages not received correctly", 0, _allSecondMessagesSent010.getCount()); + try + { + assertTrue(_allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS)); + } + catch (InterruptedException e) + { + // do nothing } + assertEquals("First batch of messages not received correctly", 0, _allFirstMessagesSent.getCount()); + assertEquals("Second batch of messages not received correctly", 0, _allSecondMessagesSent.getCount()); } public static junit.framework.Test suite() |