summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java297
1 files changed, 136 insertions, 161 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 045deab052..653ab8f733 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -135,197 +135,172 @@ public class TransactedTest extends QpidBrokerTestCase
public void testCommit() throws Exception
{
- try
- {
-// add some messages
- _logger.info("Send prep A");
- prepProducer1.send(prepSession.createTextMessage("A"));
- _logger.info("Send prep B");
- prepProducer1.send(prepSession.createTextMessage("B"));
- _logger.info("Send prep C");
- prepProducer1.send(prepSession.createTextMessage("C"));
-
- // send and receive some messages
- _logger.info("Send X to Q2");
- producer2.send(session.createTextMessage("X"));
- _logger.info("Send Y to Q2");
- producer2.send(session.createTextMessage("Y"));
- _logger.info("Send Z to Q2");
- producer2.send(session.createTextMessage("Z"));
-
- _logger.info("Read A from Q1");
- expect("A", consumer1.receive(1000));
- _logger.info("Read B from Q1");
- expect("B", consumer1.receive(1000));
- _logger.info("Read C from Q1");
- expect("C", consumer1.receive(1000));
-
- // commit
- _logger.info("session commit");
- session.commit();
- _logger.info("Start test Connection");
- testCon.start();
-
- // ensure sent messages can be received and received messages are gone
- _logger.info("Read X from Q2");
- expect("X", testConsumer2.receive(1000));
- _logger.info("Read Y from Q2");
- expect("Y", testConsumer2.receive(1000));
- _logger.info("Read Z from Q2");
- expect("Z", testConsumer2.receive(1000));
-
- _logger.info("create test session on Q1");
- testConsumer1 = testSession.createConsumer(queue1);
- _logger.info("Read null from Q1");
- assertTrue(null == testConsumer1.receive(1000));
- _logger.info("Read null from Q2");
- assertTrue(null == testConsumer2.receive(1000));
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ _logger.info("Send prep A");
+ prepProducer1.send(prepSession.createTextMessage("A"));
+ _logger.info("Send prep B");
+ prepProducer1.send(prepSession.createTextMessage("B"));
+ _logger.info("Send prep C");
+ prepProducer1.send(prepSession.createTextMessage("C"));
+
+ // send and receive some messages
+ _logger.info("Send X to Q2");
+ producer2.send(session.createTextMessage("X"));
+ _logger.info("Send Y to Q2");
+ producer2.send(session.createTextMessage("Y"));
+ _logger.info("Send Z to Q2");
+ producer2.send(session.createTextMessage("Z"));
+
+ _logger.info("Read A from Q1");
+ expect("A", consumer1.receive(1000));
+ _logger.info("Read B from Q1");
+ expect("B", consumer1.receive(1000));
+ _logger.info("Read C from Q1");
+ expect("C", consumer1.receive(1000));
+
+ // commit
+ _logger.info("session commit");
+ session.commit();
+ _logger.info("Start test Connection");
+ testCon.start();
+
+ // ensure sent messages can be received and received messages are gone
+ _logger.info("Read X from Q2");
+ expect("X", testConsumer2.receive(1000));
+ _logger.info("Read Y from Q2");
+ expect("Y", testConsumer2.receive(1000));
+ _logger.info("Read Z from Q2");
+ expect("Z", testConsumer2.receive(1000));
+
+ _logger.info("create test session on Q1");
+ testConsumer1 = testSession.createConsumer(queue1);
+ _logger.info("Read null from Q1");
+ assertTrue(null == testConsumer1.receive(1000));
+ _logger.info("Read null from Q2");
+ assertTrue(null == testConsumer2.receive(1000));
}
public void testRollback() throws Exception
{
- try
- {
-// add some messages
- _logger.info("Send prep RB_A");
- prepProducer1.send(prepSession.createTextMessage("RB_A"));
- _logger.info("Send prep RB_B");
- prepProducer1.send(prepSession.createTextMessage("RB_B"));
- _logger.info("Send prep RB_C");
- prepProducer1.send(prepSession.createTextMessage("RB_C"));
-
- _logger.info("Sending RB_X RB_Y RB_Z");
- producer2.send(session.createTextMessage("RB_X"));
- producer2.send(session.createTextMessage("RB_Y"));
- producer2.send(session.createTextMessage("RB_Z"));
- _logger.info("Receiving RB_A RB_B");
- expect("RB_A", consumer1.receive(1000));
- expect("RB_B", consumer1.receive(1000));
- // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it.
- // Quick sleep to ensure 'RB_C' gets pre-fetched
- Thread.sleep(500);
-
- // rollback
- _logger.info("rollback");
- session.rollback();
-
- _logger.info("Receiving RB_A RB_B RB_C");
- // ensure sent messages are not visible and received messages are requeued
- expect("RB_A", consumer1.receive(1000), true);
- expect("RB_B", consumer1.receive(1000), true);
- expect("RB_C", consumer1.receive(1000), true);
- _logger.info("Starting new connection");
- testCon.start();
- testConsumer1 = testSession.createConsumer(queue1);
- _logger.info("Testing we have no messages left");
- assertTrue(null == testConsumer1.receive(1000));
- assertTrue(null == testConsumer2.receive(1000));
-
- session.commit();
-
- _logger.info("Testing we have no messages left after commit");
- assertTrue(null == testConsumer1.receive(1000));
- assertTrue(null == testConsumer2.receive(1000));
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ // add some messages
+ _logger.info("Send prep RB_A");
+ prepProducer1.send(prepSession.createTextMessage("RB_A"));
+ _logger.info("Send prep RB_B");
+ prepProducer1.send(prepSession.createTextMessage("RB_B"));
+ _logger.info("Send prep RB_C");
+ prepProducer1.send(prepSession.createTextMessage("RB_C"));
+
+ _logger.info("Sending RB_X RB_Y RB_Z");
+ producer2.send(session.createTextMessage("RB_X"));
+ producer2.send(session.createTextMessage("RB_Y"));
+ producer2.send(session.createTextMessage("RB_Z"));
+ _logger.info("Receiving RB_A RB_B");
+ expect("RB_A", consumer1.receive(1000));
+ expect("RB_B", consumer1.receive(1000));
+ // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it.
+ // Quick sleep to ensure 'RB_C' gets pre-fetched
+ Thread.sleep(500);
+
+ // rollback
+ _logger.info("rollback");
+ session.rollback();
+
+ _logger.info("Receiving RB_A RB_B RB_C");
+ // ensure sent messages are not visible and received messages are requeued
+ expect("RB_A", consumer1.receive(1000), true);
+ expect("RB_B", consumer1.receive(1000), true);
+ expect("RB_C", consumer1.receive(1000), isBroker010()?false:true);
+ _logger.info("Starting new connection");
+ testCon.start();
+ testConsumer1 = testSession.createConsumer(queue1);
+ _logger.info("Testing we have no messages left");
+ assertTrue(null == testConsumer1.receive(1000));
+ assertTrue(null == testConsumer2.receive(1000));
+
+ session.commit();
+
+ _logger.info("Testing we have no messages left after commit");
+ assertTrue(null == testConsumer1.receive(1000));
+ assertTrue(null == testConsumer2.receive(1000));
}
public void testResendsMsgsAfterSessionClose() throws Exception
{
- try
- {
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
- Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
- AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
- MessageConsumer consumer = consumerSession.createConsumer(queue3);
+ Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
+ AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
+ MessageConsumer consumer = consumerSession.createConsumer(queue3);
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer producer = producerSession.createProducer(queue3);
+ AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
+ Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = producerSession.createProducer(queue3);
- _logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
- producerSession.commit();
+ producerSession.commit();
- _logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- assertNotNull(tm);
- assertEquals("msg1", tm.getText());
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ assertNotNull(tm);
+ assertEquals("msg1", tm.getText());
- consumerSession.commit();
+ consumerSession.commit();
- _logger.info("Received and committed first message");
- tm = (TextMessage) consumer.receive(1000);
- assertNotNull(tm);
- assertEquals("msg2", tm.getText());
+ _logger.info("Received and committed first message");
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ assertEquals("msg2", tm.getText());
- tm = (TextMessage) consumer.receive(1000);
- assertNotNull(tm);
- assertEquals("msg3", tm.getText());
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ assertEquals("msg3", tm.getText());
- tm = (TextMessage) consumer.receive(1000);
- assertNotNull(tm);
- assertEquals("msg4", tm.getText());
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ assertEquals("msg4", tm.getText());
- _logger.info("Received all four messages. Closing connection with three outstanding messages");
+ _logger.info("Received all four messages. Closing connection with three outstanding messages");
- consumerSession.close();
+ consumerSession.close();
- consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
+ consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
- consumer = consumerSession.createConsumer(queue3);
+ consumer = consumerSession.createConsumer(queue3);
- // no ack for last three messages so when I call recover I expect to get three messages back
- tm = (TextMessage) consumer.receive(3000);
- assertNotNull(tm);
- assertEquals("msg2", tm.getText());
- assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+ // no ack for last three messages so when I call recover I expect to get three messages back
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg2", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
- tm = (TextMessage) consumer.receive(3000);
- assertNotNull(tm);
- assertEquals("msg3", tm.getText());
- assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg3", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
- tm = (TextMessage) consumer.receive(3000);
- assertNotNull(tm);
- assertEquals("msg4", tm.getText());
- assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg4", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
- _logger.info("Received redelivery of three messages. Committing");
+ _logger.info("Received redelivery of three messages. Committing");
- consumerSession.commit();
+ consumerSession.commit();
- _logger.info("Called commit");
+ _logger.info("Called commit");
- tm = (TextMessage) consumer.receive(1000);
- assertNull(tm);
+ tm = (TextMessage) consumer.receive(1000);
+ assertNull(tm);
- _logger.info("No messages redelivered as is expected");
+ _logger.info("No messages redelivered as is expected");
- con.close();
- con2.close();
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ con.close();
+ con2.close();
}
private void expect(String text, Message msg) throws JMSException