diff options
Diffstat (limited to 'qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted')
5 files changed, 1604 insertions, 0 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java new file mode 100644 index 0000000000..4715831de6 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.transacted; + +import org.apache.qpid.client.RejectBehaviour; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * This class tests a number of commits and roll back scenarios + * + * Assumptions; - Assumes empty Queue + * + * @see org.apache.qpid.test.client.RollbackOrderTest + */ +public class CommitRollbackTest extends QpidBrokerTestCase +{ + private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class); + private static final int POSITIVE_TIMEOUT = 2000; + private static final int NEGATIVE_TIMEOUT = 250; + + protected AMQConnection _conn; + private Session _session; + private MessageProducer _publisher; + private Session _pubSession; + private MessageConsumer _consumer; + private Queue _jmsQueue; + + private void newConnection() throws Exception + { + _logger.debug("calling newConnection()"); + _conn = (AMQConnection) getConnection(); + + _session = _conn.createSession(true, Session.SESSION_TRANSACTED); + + final String queueName = getTestQueueName(); + _jmsQueue = _session.createQueue(queueName); + _consumer = _session.createConsumer(_jmsQueue); + + _pubSession = _conn.createSession(true, Session.SESSION_TRANSACTED); + + _publisher = _pubSession.createProducer(_pubSession.createQueue(queueName)); + + _conn.start(); + } + + /** + * PUT a text message, disconnect before commit, confirm it is gone. + * + * @throws Exception On error + */ + public void testPutThenDisconnect() throws Exception + { + newConnection(); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("reconnecting without commit"); + _conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(NEGATIVE_TIMEOUT); + + // commit to ensure message is removed from queue + _session.commit(); + + assertNull("test message was put and disconnected before commit, but is still present", result); + } + + + /** + * PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different + * session as producer + * + * @throws Exception On error + */ + public void testPutThenRollback() throws Exception + { + newConnection(); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenRollback"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("rolling back"); + _pubSession.rollback(); + + _logger.info("receiving result"); + Message result = _consumer.receive(NEGATIVE_TIMEOUT); + + assertNull("test message was put and rolled back, but is still present", result); + } + + /** + * GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection + * + * @throws Exception On error + */ + public void testGetThenDisconnect() throws Exception + { + newConnection(); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(POSITIVE_TIMEOUT); + assertNotNull("retrieved message is null", msg); + + _logger.info("closing connection"); + _conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(NEGATIVE_TIMEOUT); + + _session.commit(); + + assertNotNull("test message was consumed and disconnected before commit, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + } + + /** + * GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the + * same connection but different session as producer + * + * @throws Exception On error + */ + public void testGetThenCloseDisconnect() throws Exception + { + newConnection(); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenCloseDisconnect"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(POSITIVE_TIMEOUT); + assertNotNull("retrieved message is null", msg); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText()); + + _logger.info("reconnecting without commit"); + _consumer.close(); + _conn.close(); + + newConnection(); + + _logger.info("receiving result"); + Message result = _consumer.receive(POSITIVE_TIMEOUT); + + _session.commit(); + + assertNotNull("test message was consumed and disconnected before commit, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + } + + /** + * GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt + * session to the producer + * + * @throws Exception On error + */ + public void testGetThenRollback() throws Exception + { + newConnection(); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenRollback"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(POSITIVE_TIMEOUT); + + assertNotNull("retrieved message is null", msg); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText()); + + _logger.info("rolling back"); + + _session.rollback(); + + _logger.info("receiving result"); + + Message result = _consumer.receive(POSITIVE_TIMEOUT); + + _session.commit(); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + assertTrue("Message is not marked as redelivered", result.getJMSRedelivered()); + } + + /** + * GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same + * connection but different session as producer + * + * @throws Exception On error + */ + public void testGetThenCloseRollback() throws Exception + { + newConnection(); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testGetThenCloseRollback"; + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + _logger.info("getting test message"); + + Message msg = _consumer.receive(POSITIVE_TIMEOUT); + + assertNotNull("retrieved message is null", msg); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText()); + + _logger.info("Closing consumer"); + _consumer.close(); + + _logger.info("rolling back"); + _session.rollback(); + + _logger.info("receiving result"); + + _consumer = _session.createConsumer(_jmsQueue); + + Message result = _consumer.receive(POSITIVE_TIMEOUT); + + _session.commit(); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText()); + assertTrue("Message is not marked as redelivered", result.getJMSRedelivered()); + } + + /** + * This test sends two messages receives one of them but doesn't ack it. + * The consumer is then closed + * the first message should be returned as redelivered. + * the second message should be delivered normally. + * @throws Exception + */ + public void testSend2ThenCloseAfter1andTryAgain() throws Exception + { + newConnection(); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending two test messages"); + _publisher.send(_pubSession.createTextMessage("1")); + _publisher.send(_pubSession.createTextMessage("2")); + _pubSession.commit(); + + _logger.info("getting test message"); + Message result = _consumer.receive(POSITIVE_TIMEOUT); + + assertNotNull("Message received should not be null", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Message is marked as redelivered" + result, !result.getJMSRedelivered()); + + _logger.info("Closing Consumer"); + + _consumer.close(); + + _logger.info("Creating New consumer"); + _consumer = _session.createConsumer(_jmsQueue); + + _logger.info("receiving result"); + + + // Message 2 may be marked as redelivered if it was prefetched. + result = _consumer.receive(POSITIVE_TIMEOUT); + assertNotNull("Second message was not consumed, but is gone", result); + + // The first message back will be 2, message 1 has been received but not committed + // Closing the consumer does not commit the session. + + // if this is message 1 then it should be marked as redelivered + if("1".equals(((TextMessage) result).getText())) + { + fail("First message was received again"); + } + + result = _consumer.receive(NEGATIVE_TIMEOUT); + assertNull("test message should be null:" + result, result); + + _session.commit(); + } + + public void testPutThenRollbackThenGet() throws Exception + { + newConnection(); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "testPutThenRollbackThenGet"; + + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + _pubSession.commit(); + + assertNotNull(_consumer.receive(POSITIVE_TIMEOUT)); + + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _logger.info("rolling back"); + _pubSession.rollback(); + + _logger.info("receiving result"); + Message result = _consumer.receive(NEGATIVE_TIMEOUT); + assertNull("test message was put and rolled back, but is still present", result); + + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + assertNotNull(_consumer.receive(POSITIVE_TIMEOUT)); + + _session.commit(); + } + + /** + * Qpid-1163 + * Check that when commit is called inside onMessage then + * the last message is nor redelivered. + * + * @throws Exception + */ + public void testCommitWithinOnMessage() throws Exception + { + newConnection(); + + Queue queue = (Queue) getInitialContext().lookup("queue"); + // create a consumer + MessageConsumer cons = _session.createConsumer(queue); + MessageProducer prod = _session.createProducer(queue); + Message message = _session.createTextMessage("Message"); + message.setJMSCorrelationID("m1"); + prod.send(message); + _session.commit(); + _logger.info("Sent message to queue"); + CountDownLatch cd = new CountDownLatch(1); + cons.setMessageListener(new CommitWithinOnMessageListener(cd)); + _conn.start(); + cd.await(30, TimeUnit.SECONDS); + if( cd.getCount() > 0 ) + { + fail("Did not received message"); + } + // Check that the message has been dequeued + _session.close(); + _conn.close(); + _conn = (AMQConnection) getConnection(); + _conn.start(); + Session session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + cons = session.createConsumer(queue); + message = cons.receiveNoWait(); + if(message != null) + { + if(message.getJMSCorrelationID().equals("m1")) + { + fail("received message twice"); + } + else + { + fail("queue should have been empty, received message: " + message); + } + } + } + + /** + * This test ensures that after exhausting credit (prefetch), a {@link Session#rollback()} successfully + * restores credit and allows the same messages to be re-received. + */ + public void testRollbackSessionAfterCreditExhausted() throws Exception + { + final int maxPrefetch= 5; + + // We send more messages than prefetch size. This ensure that if the 0-10 client were to + // complete the message commands before the rollback command is sent, the broker would + // send additional messages utilising the release credit. This problem would manifest itself + // as an incorrect message (or no message at all) being received at the end of the test. + + final int numMessages = maxPrefetch * 2; + + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, String.valueOf(maxPrefetch)); + + newConnection(); + + assertEquals("Prefetch not reset", maxPrefetch, ((AMQSession<?, ?>)_session).getDefaultPrefetch()); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + sendMessage(_pubSession, _publisher.getDestination(), numMessages); + _pubSession.commit(); + + for (int i=0 ;i< maxPrefetch; i++) + { + final Message message = _consumer.receive(POSITIVE_TIMEOUT); + assertNotNull("Received:" + i, message); + assertEquals("Unexpected message received", i, message.getIntProperty(INDEX)); + } + + _logger.info("Rolling back"); + _session.rollback(); + + _logger.info("Receiving messages"); + + Message result = _consumer.receive(POSITIVE_TIMEOUT);; + assertNotNull("Message expected", result); + // Expect the first message + assertEquals("Unexpected message received", 0, result.getIntProperty(INDEX)); + } + + private class CommitWithinOnMessageListener implements MessageListener + { + private CountDownLatch _cd; + private CommitWithinOnMessageListener(CountDownLatch cd) + { + _cd = cd; + } + public void onMessage(Message message) + { + try + { + _logger.info("received message " + message); + assertEquals("Wrong message received", message.getJMSCorrelationID(), "m1"); + _logger.info("commit session"); + _session.commit(); + _cd.countDown(); + } + catch (JMSException e) + { + _logger.error("OnMessage error",e); + } + } + } + + + public void testResendUnseenMessagesAfterRollback() throws Exception + { + resendAfterRollback(); + } + + public void testResendUnseenMessagesAfterRollbackWithServerReject() throws Exception + { + setTestSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.SERVER.toString()); + resendAfterRollback(); + } + + private void resendAfterRollback() throws Exception + { + newConnection(); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending test message"); + String MESSAGE_TEXT = "message text"; + + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); + + _pubSession.commit(); + + assertNotNull("two messages were sent, but none has been received", _consumer.receive(POSITIVE_TIMEOUT)); + + _session.rollback(); + + _logger.info("receiving result"); + + assertNotNull("two messages were sent, but none has been received", _consumer.receive(POSITIVE_TIMEOUT)); + assertNotNull("two messages were sent, but only one has been received", _consumer.receive(POSITIVE_TIMEOUT)); + assertNull("Only two messages were sent, but more have been received", _consumer.receive(NEGATIVE_TIMEOUT)); + + _session.commit(); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java new file mode 100644 index 0000000000..78c76602c5 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -0,0 +1,389 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.Session; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +public class TransactedTest extends QpidBrokerTestCase +{ + private AMQQueue queue1; + + private AMQConnection con; + private Session session; + private MessageConsumer consumer1; + private MessageProducer producer2; + + private AMQConnection prepCon; + private Session prepSession; + private MessageProducer prepProducer1; + + private AMQConnection testCon; + private Session testSession; + private MessageConsumer testConsumer1; + private MessageConsumer testConsumer2; + private static final Logger _logger = LoggerFactory.getLogger(TransactedTest.class); + + protected void setUp() throws Exception + { + try + { + super.setUp(); + _logger.info("Create Connection"); + con = (AMQConnection) getConnection("guest", "guest"); + _logger.info("Create Session"); + session = con.createSession(true, Session.SESSION_TRANSACTED); + _logger.info("Create Q1"); + queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), + new AMQShortString("Q1"), false, true); + _logger.info("Create Q2"); + AMQQueue queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false); + + _logger.info("Create Consumer of Q1"); + consumer1 = session.createConsumer(queue1); + // Dummy just to create the queue. + _logger.info("Create Consumer of Q2"); + MessageConsumer consumer2 = session.createConsumer(queue2); + _logger.info("Close Consumer of Q2"); + consumer2.close(); + + _logger.info("Create producer to Q2"); + producer2 = session.createProducer(queue2); + + _logger.info("Start Connection"); + con.start(); + + _logger.info("Create prep connection"); + prepCon = (AMQConnection) getConnection("guest", "guest"); + + _logger.info("Create prep session"); + prepSession = prepCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + + _logger.info("Create prep producer to Q1"); + prepProducer1 = prepSession.createProducer(queue1); + + _logger.info("Create prep connection start"); + prepCon.start(); + + _logger.info("Create test connection"); + testCon = (AMQConnection) getConnection("guest", "guest"); + _logger.info("Create test session"); + testSession = testCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + _logger.info("Create test consumer of q2"); + testConsumer2 = testSession.createConsumer(queue2); + } + catch (Exception e) + { + _logger.error("setup error",e); + stopBroker(); + throw e; + } + } + + protected void tearDown() throws Exception + { + try + { + _logger.info("Close connection"); + con.close(); + _logger.info("Close test connection"); + testCon.close(); + _logger.info("Close prep connection"); + prepCon.close(); + } + catch (Exception e) + { + _logger.error("tear down error",e); + } + finally + { + super.tearDown(); + } + } + + public void testCommit() throws Exception + { + _logger.info("Send prep A"); + prepProducer1.send(prepSession.createTextMessage("A")); + _logger.info("Send prep B"); + prepProducer1.send(prepSession.createTextMessage("B")); + _logger.info("Send prep C"); + prepProducer1.send(prepSession.createTextMessage("C")); + + // send and receive some messages + _logger.info("Send X to Q2"); + producer2.send(session.createTextMessage("X")); + _logger.info("Send Y to Q2"); + producer2.send(session.createTextMessage("Y")); + _logger.info("Send Z to Q2"); + producer2.send(session.createTextMessage("Z")); + + _logger.info("Read A from Q1"); + expect("A", consumer1.receive(1000)); + _logger.info("Read B from Q1"); + expect("B", consumer1.receive(1000)); + _logger.info("Read C from Q1"); + expect("C", consumer1.receive(1000)); + + // commit + _logger.info("session commit"); + session.commit(); + _logger.info("Start test Connection"); + testCon.start(); + + // ensure sent messages can be received and received messages are gone + _logger.info("Read X from Q2"); + expect("X", testConsumer2.receive(1000)); + _logger.info("Read Y from Q2"); + expect("Y", testConsumer2.receive(1000)); + _logger.info("Read Z from Q2"); + expect("Z", testConsumer2.receive(1000)); + + _logger.info("create test session on Q1"); + testConsumer1 = testSession.createConsumer(queue1); + _logger.info("Read null from Q1"); + assertTrue(null == testConsumer1.receive(1000)); + _logger.info("Read null from Q2"); + assertTrue(null == testConsumer2.receive(1000)); + } + + public void testRollback() throws Exception + { + // add some messages + _logger.info("Send prep RB_A"); + prepProducer1.send(prepSession.createTextMessage("RB_A")); + _logger.info("Send prep RB_B"); + prepProducer1.send(prepSession.createTextMessage("RB_B")); + _logger.info("Send prep RB_C"); + prepProducer1.send(prepSession.createTextMessage("RB_C")); + + _logger.info("Sending RB_X RB_Y RB_Z"); + producer2.send(session.createTextMessage("RB_X")); + producer2.send(session.createTextMessage("RB_Y")); + producer2.send(session.createTextMessage("RB_Z")); + _logger.info("Receiving RB_A RB_B"); + expect("RB_A", consumer1.receive(1000)); + expect("RB_B", consumer1.receive(1000)); + // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it. + // Quick sleep to ensure 'RB_C' gets pre-fetched + Thread.sleep(500); + + // rollback + _logger.info("rollback"); + session.rollback(); + + _logger.info("Receiving RB_A RB_B RB_C"); + // ensure sent messages are not visible and received messages are requeued + expect("RB_A", consumer1.receive(1000), true); + expect("RB_B", consumer1.receive(1000), true); + expect("RB_C", consumer1.receive(1000), isBroker010()?false:true); + _logger.info("Starting new connection"); + testCon.start(); + testConsumer1 = testSession.createConsumer(queue1); + _logger.info("Testing we have no messages left"); + assertTrue(null == testConsumer1.receive(1000)); + assertTrue(null == testConsumer2.receive(1000)); + + session.commit(); + + _logger.info("Testing we have no messages left after commit"); + assertTrue(null == testConsumer1.receive(1000)); + assertTrue(null == testConsumer2.receive(1000)); + } + + public void testResendsMsgsAfterSessionClose() throws Exception + { + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + + Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); + AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false); + MessageConsumer consumer = consumerSession.createConsumer(queue3); + + AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); + Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = producerSession.createProducer(queue3); + + _logger.info("Sending four messages"); + producer.send(producerSession.createTextMessage("msg1")); + producer.send(producerSession.createTextMessage("msg2")); + producer.send(producerSession.createTextMessage("msg3")); + producer.send(producerSession.createTextMessage("msg4")); + + producerSession.commit(); + + _logger.info("Starting connection"); + con.start(); + TextMessage tm = (TextMessage) consumer.receive(); + assertNotNull(tm); + assertEquals("msg1", tm.getText()); + + consumerSession.commit(); + + _logger.info("Received and committed first message"); + tm = (TextMessage) consumer.receive(1000); + assertNotNull(tm); + assertEquals("msg2", tm.getText()); + + tm = (TextMessage) consumer.receive(1000); + assertNotNull(tm); + assertEquals("msg3", tm.getText()); + + tm = (TextMessage) consumer.receive(1000); + assertNotNull(tm); + assertEquals("msg4", tm.getText()); + + _logger.info("Received all four messages. Closing connection with three outstanding messages"); + + consumerSession.close(); + + consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); + + consumer = consumerSession.createConsumer(queue3); + + // no ack for last three messages so when I call recover I expect to get three messages back + tm = (TextMessage) consumer.receive(3000); + assertNotNull(tm); + assertEquals("msg2", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); + + tm = (TextMessage) consumer.receive(3000); + assertNotNull(tm); + assertEquals("msg3", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); + + tm = (TextMessage) consumer.receive(3000); + assertNotNull(tm); + assertEquals("msg4", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); + + _logger.info("Received redelivery of three messages. Committing"); + + consumerSession.commit(); + + _logger.info("Called commit"); + + tm = (TextMessage) consumer.receive(1000); + assertNull(tm); + + _logger.info("No messages redelivered as is expected"); + + con.close(); + con2.close(); + } + + public void testCommitOnClosedConnection() throws Exception + { + Connection connnection = getConnection(); + javax.jms.Session transactedSession = connnection.createSession(true, Session.SESSION_TRANSACTED); + connnection.close(); + try + { + transactedSession.commit(); + fail("Commit on closed connection should throw IllegalStateException!"); + } + catch(IllegalStateException e) + { + // passed + } + } + + public void testCommitOnClosedSession() throws Exception + { + Connection connnection = getConnection(); + javax.jms.Session transactedSession = connnection.createSession(true, Session.SESSION_TRANSACTED); + transactedSession.close(); + try + { + transactedSession.commit(); + fail("Commit on closed session should throw IllegalStateException!"); + } + catch(IllegalStateException e) + { + // passed + } + } + + public void testRollbackOnClosedSession() throws Exception + { + Connection connnection = getConnection(); + javax.jms.Session transactedSession = connnection.createSession(true, Session.SESSION_TRANSACTED); + transactedSession.close(); + try + { + transactedSession.rollback(); + fail("Rollback on closed session should throw IllegalStateException!"); + } + catch(IllegalStateException e) + { + // passed + } + } + + public void testGetTransactedOnClosedSession() throws Exception + { + Connection connnection = getConnection(); + javax.jms.Session transactedSession = connnection.createSession(true, Session.SESSION_TRANSACTED); + transactedSession.close(); + try + { + transactedSession.getTransacted(); + fail("According to Sun TCK invocation of Session#getTransacted on closed session should throw IllegalStateException!"); + } + catch(IllegalStateException e) + { + // passed + } + } + + private void expect(String text, Message msg) throws JMSException + { + expect(text, msg, false); + } + + private void expect(String text, Message msg, boolean requeued) throws JMSException + { + assertNotNull("Message should not be null", msg); + assertTrue("Message should be a text message", msg instanceof TextMessage); + assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText()); + assertEquals("Message should " + (requeued ? "" : "not") + " be requeued", requeued, msg.getJMSRedelivered()); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(TransactedTest.class); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java new file mode 100644 index 0000000000..e37c6cf54b --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +import org.apache.qpid.test.utils.TestBrokerConfiguration; + +/** + * This verifies that the default behaviour is not to time out transactions. + */ +public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase +{ + @Override + protected void configure() throws Exception + { + // Setup housekeeping every second + TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration(); + setTestSystemProperty("virtualhost.housekeepingCheckPeriod", "100"); + + // No transaction timeout configuration. + } + + public void testProducerIdleCommit() throws Exception + { + try + { + send(5, 0); + + sleep(2.0f); + + _psession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + + monitor(0, 0); + } + + public void testProducerOpenCommit() throws Exception + { + try + { + send(5, 0.3f); + + _psession.commit(); + } + catch (Exception e) + { + fail("Should have succeeded"); + } + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + + monitor(0, 0); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java new file mode 100644 index 0000000000..b84e03972d --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java @@ -0,0 +1,350 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; + +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.test.utils.TestBrokerConfiguration; + +/** + * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration + * is set for a virtual host. + * + * A producer that is idle for too long or open for too long will have its connection/session(0-10) closed and + * any further operations will fail with a 408 resource timeout exception. Consumers will not + * be affected by the transaction timeout configuration. + */ +public class TransactionTimeoutTest extends TransactionTimeoutTestCase +{ + + protected void configure() throws Exception + { + // switch off connection close in order to test timeout on publishing of unroutable messages + getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, false); + + // Setup housekeeping every 100ms + TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration(); + setTestSystemProperty("virtualhost.housekeepingCheckPeriod","100"); + + if (getName().contains("ProducerIdle")) + { + setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutWarn", "0"); + setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutClose", "0"); + setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutWarn", "500"); + setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutClose", "1500"); + } + else if (getName().contains("ProducerOpen")) + { + setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutWarn", "1000"); + setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutClose", "2000"); + setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutWarn", "0"); + setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutClose", "0"); + } + else + { + setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutWarn", "1000"); + setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutClose", "2000"); + setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutWarn", "500"); + setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutClose", "1500"); + } + } + + public void testProducerIdle() throws Exception + { + sleep(2.0f); + + _psession.commit(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + + monitor(0, 0); + } + + public void testProducerIdleCommit() throws Exception + { + send(5, 0); + // Idle for more than idleClose to generate idle-warns and cause a close. + sleep(2.0f); + + try + { + _psession.commit(); + fail("Exception not thrown"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(10, 0); + + check(IDLE); + } + + public void testProducerIdleCommitTwice() throws Exception + { + send(5, 0); + // Idle for less than idleClose to generate idle-warns + sleep(1.0f); + + _psession.commit(); + + send(5, 0); + // Now idle for more than idleClose to generate more idle-warns and cause a close. + sleep(2.0f); + + try + { + _psession.commit(); + fail("Exception not thrown"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(15, 0); + + check(IDLE); + } + + public void testProducerIdleRollback() throws Exception + { + send(5, 0); + // Now idle for more than idleClose to generate more idle-warns and cause a close. + sleep(2.0f); + try + { + _psession.rollback(); + fail("Exception not thrown"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(10, 0); + + check(IDLE); + } + + public void testProducerIdleRollbackTwice() throws Exception + { + send(5, 0); + // Idle for less than idleClose to generate idle-warns + sleep(1.0f); + _psession.rollback(); + send(5, 0); + // Now idle for more than idleClose to generate more idle-warns and cause a close. + sleep(2.0f); + try + { + _psession.rollback(); + fail("should fail"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(15, 0); + + check(IDLE); + } + + public void testProducerOpenCommit() throws Exception + { + try + { + // Sleep between sends to cause open warns and then cause a close. + send(6, 0.5f); + _psession.commit(); + fail("Exception not thrown"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(0, 10); + + check(OPEN); + } + + public void testProducerOpenCommitTwice() throws Exception + { + send(5, 0); + sleep(1.0f); + _psession.commit(); + + try + { + // Now sleep between sends to cause open warns and then cause a close. + send(6, 0.5f); + _psession.commit(); + fail("Exception not thrown"); + } + catch (Exception e) + { + _exception = e; + } + + monitor(0, 10); + + check(OPEN); + } + + public void testConsumerCommitClose() throws Exception + { + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + _csession.commit(); + + sleep(3.0f); + + _csession.close(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + + monitor(0, 0); + } + + public void testConsumerIdleReceiveCommit() throws Exception + { + send(1, 0); + + _psession.commit(); + + sleep(2.0f); + + expect(1, 0); + + sleep(2.0f); + + _csession.commit(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + + monitor(0, 0); + } + + public void testConsumerIdleCommit() throws Exception + { + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + sleep(2.0f); + + _csession.commit(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + + monitor(0, 0); + } + + public void testConsumerIdleRollback() throws Exception + { + send(1, 0); + + _psession.commit(); + + expect(1, 0); + + sleep(2.0f); + + _csession.rollback(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + + monitor(0, 0); + } + + public void testConsumerOpenCommit() throws Exception + { + send(1, 0); + + _psession.commit(); + + sleep(3.0f); + + _csession.commit(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + + monitor(0, 0); + } + + public void testConsumerOpenRollback() throws Exception + { + send(1, 0); + + _psession.commit(); + + sleep(3.0f); + + _csession.rollback(); + + assertEquals("Listener should not have received exception", 0, getNumberOfDeliveredExceptions()); + + monitor(0, 0); + } + + /** + * Tests that sending an unroutable persistent message does not result in a long running store transaction [warning]. + */ + public void testTransactionCommittedOnNonRoutableQueuePersistentMessage() throws Exception + { + checkTransactionCommittedOnNonRoutableQueueMessage(DeliveryMode.PERSISTENT); + } + + /** + * Tests that sending an unroutable transient message does not result in a long running store transaction [warning]. + */ + public void testTransactionCommittedOnNonRoutableQueueTransientMessage() throws Exception + { + checkTransactionCommittedOnNonRoutableQueueMessage(DeliveryMode.NON_PERSISTENT); + } + + private void checkTransactionCommittedOnNonRoutableQueueMessage(int deliveryMode) throws JMSException, Exception + { + Queue nonExisting = _psession.createQueue(getTestQueueName() + System.currentTimeMillis()); + MessageProducer producer = _psession.createProducer(nonExisting); + Message message = _psession.createMessage(); + producer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + _psession.commit(); + + // give time to house keeping thread to log messages + sleep(3f); + monitor(0, 0); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java new file mode 100644 index 0000000000..98fe29f826 --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java @@ -0,0 +1,244 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.LogMonitor; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The {@link TestCase} for transaction timeout testing. + */ +public abstract class TransactionTimeoutTestCase extends QpidBrokerTestCase implements ExceptionListener +{ + private static final int ALERT_MESSAGE_TOLERANCE = 6; + public static final String VIRTUALHOST = "test"; + public static final String TEXT = "0123456789abcdefghiforgettherest"; + public static final String CHN_OPEN_TXN = "CHN-1007"; + public static final String CHN_IDLE_TXN = "CHN-1008"; + public static final String IDLE = "Idle"; + public static final String OPEN = "Open"; + + protected LogMonitor _monitor; + protected Connection _con; + protected Session _psession, _csession; + protected Queue _queue; + protected MessageConsumer _consumer; + protected MessageProducer _producer; + protected Exception _exception; + + private final CountDownLatch _exceptionListenerLatch = new CountDownLatch(1); + private final AtomicInteger _exceptionCount = new AtomicInteger(0); + private volatile AMQConstant _linkedExceptionCode; + private volatile String _linkedExceptionMessage; + + /** + * Subclasses must implement this to configure transaction timeout parameters. + */ + protected abstract void configure() throws Exception; + + @Override + protected void setUp() throws Exception + { + // Configure timeouts + configure(); + + // Monitor log file + _monitor = new LogMonitor(_outputFile); + + // Start broker + super.setUp(); + + // Connect to broker + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, String.valueOf(1)); + _con = getConnection(); + _con.setExceptionListener(this); + _con.start(); + + // Create queue + Session qsession = _con.createSession(true, Session.SESSION_TRANSACTED); + _queue = qsession.createQueue(getTestQueueName()); + qsession.close(); + + // Create producer and consumer + producer(); + consumer(); + } + + /** + * Create a transacted persistent message producer session. + */ + protected void producer() throws Exception + { + _psession = _con.createSession(true, Session.SESSION_TRANSACTED); + _producer = _psession.createProducer(_queue); + _producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } + + /** + * Create a transacted message consumer session. + */ + protected void consumer() throws Exception + { + _csession = _con.createSession(true, Session.SESSION_TRANSACTED); + _consumer = _csession.createConsumer(_queue); + } + + /** + * Send a number of messages to the queue, optionally pausing after each. + * + * Need to sync to ensure that the Broker has received the message(s) in order + * the test and broker start timing the idle transaction from the same point in time. + */ + protected void send(int count, float delay) throws Exception + { + for (int i = 0; i < count; i++) + { + sleep(delay); + Message msg = _psession.createTextMessage(TEXT); + msg.setIntProperty("i", i); + _producer.send(msg); + } + + ((AMQSession<?, ?>)_psession).sync(); + } + + /** + * Sleep for a number of seconds. + */ + protected void sleep(float seconds) throws Exception + { + try + { + Thread.sleep((long) (seconds * 1000.0f)); + } + catch (InterruptedException ie) + { + throw new RuntimeException("Interrupted"); + } + } + + /** + * Check for idle and open messages. + * + * Either exactly zero messages, or +-2 error accepted around the specified number. + */ + protected void monitor(int idle, int open) throws Exception + { + List<String> idleMsgs = _monitor.findMatches(CHN_IDLE_TXN); + List<String> openMsgs = _monitor.findMatches(CHN_OPEN_TXN); + + String idleErr = "Expected " + idle + " but found " + idleMsgs.size() + " txn idle messages"; + String openErr = "Expected " + open + " but found " + openMsgs.size() + " txn open messages"; + + if (idle == 0) + { + assertTrue(idleErr, idleMsgs.isEmpty()); + } + else + { + assertTrue(idleErr, idleMsgs.size() >= idle - ALERT_MESSAGE_TOLERANCE && idleMsgs.size() <= idle + ALERT_MESSAGE_TOLERANCE); + } + + if (open == 0) + { + assertTrue(openErr, openMsgs.isEmpty()); + } + else + { + assertTrue(openErr, openMsgs.size() >= open - ALERT_MESSAGE_TOLERANCE && openMsgs.size() <= open + ALERT_MESSAGE_TOLERANCE); + } + } + + /** + * Receive a number of messages, optionally pausing after each. + */ + protected void expect(int count, float delay) throws Exception + { + for (int i = 0; i < count; i++) + { + sleep(delay); + Message msg = _consumer.receive(1000); + assertNotNull("Message should not be null", msg); + assertTrue("Message should be a text message", msg instanceof TextMessage); + assertEquals("Message content does not match expected", TEXT, ((TextMessage) msg).getText()); + assertEquals("Message order is incorrect", i, msg.getIntProperty("i")); + } + } + + /** + * Checks that the correct exception was thrown and was received + * by the listener with a 506 error code. + */ + protected void check(String reason) throws InterruptedException + { + assertNotNull("Should have thrown exception to client", _exception); + + assertTrue("Should have caught exception in listener", _exceptionListenerLatch.await(1, TimeUnit.SECONDS)); + assertNotNull("Linked exception message should not be null", _linkedExceptionMessage); + assertTrue("Linked exception message '" + _linkedExceptionMessage + "' should contain '" + reason + "'", + _linkedExceptionMessage.contains(reason + " transaction timed out")); + assertNotNull("Linked exception should have an error code", _linkedExceptionCode); + assertEquals("Linked exception error code should be 506", AMQConstant.RESOURCE_ERROR, _linkedExceptionCode); + } + + /** @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */ + @Override + public void onException(JMSException jmse) + { + if (jmse.getLinkedException() != null) + { + _linkedExceptionMessage = jmse.getLinkedException().getMessage(); + } + + if (jmse.getLinkedException() instanceof AMQException) + { + _linkedExceptionCode = ((AMQException) jmse.getLinkedException()).getErrorCode(); + } + _exceptionCount.incrementAndGet(); + _exceptionListenerLatch.countDown(); + } + + protected int getNumberOfDeliveredExceptions() + { + return _exceptionCount.get(); + } +} |