diff options
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java | 42 | ||||
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java | 101 |
2 files changed, 80 insertions, 63 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java index a703432efb..e786981183 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/xa/QueueTest.java @@ -39,10 +39,17 @@ public class QueueTest extends AbstractXATestCase * the queue connection factory used by all tests */ private static XAQueueConnectionFactory _queueFactory = null; + /** - * standard queue connection + * standard xa queue connection */ - private static XAQueueConnection _queueConnection = null; + private static XAQueueConnection _xaqueueConnection= null; + + /** + * standard xa queue connection + */ + private static QueueConnection _queueConnection=null; + /** * standard queue session created from the standard connection @@ -85,7 +92,7 @@ public class QueueTest extends AbstractXATestCase { try { - _queueConnection.stop(); + _xaqueueConnection.close(); _queueConnection.close(); } catch (Exception e) @@ -125,7 +132,7 @@ public class QueueTest extends AbstractXATestCase // create standard connection try { - _queueConnection = getNewQueueXAConnection(); + _xaqueueConnection= getNewQueueXAConnection(); } catch (JMSException e) { @@ -135,7 +142,7 @@ public class QueueTest extends AbstractXATestCase XAQueueSession session = null; try { - session = _queueConnection.createXAQueueSession(); + session = _xaqueueConnection.createXAQueueSession(); } catch (JMSException e) { @@ -144,6 +151,7 @@ public class QueueTest extends AbstractXATestCase // create a standard session try { + _queueConnection = _queueFactory.createQueueConnection(); _nonXASession = _queueConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) @@ -183,7 +191,7 @@ public class QueueTest extends AbstractXATestCase try { // start the connection - _queueConnection.start(); + _xaqueueConnection.start(); // produce a message with sequence number 1 _message.setLongProperty(_sequenceNumberPropertyName, 1); _producer.send(_message); @@ -247,7 +255,7 @@ public class QueueTest extends AbstractXATestCase // receive a message from queue test we expect it to be the second one try { - TextMessage message = (TextMessage) _consumer.receiveNoWait(); + TextMessage message = (TextMessage) _consumer.receive(1000); if (message == null) { fail("did not receive second message as expected "); @@ -278,9 +286,11 @@ public class QueueTest extends AbstractXATestCase // We should now be able to receive the first message try { + _xaqueueConnection.close(); Session nonXASession = _nonXASession; MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + _queueConnection.start(); + TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000); if (message1 == null) { fail("did not receive first message as expected "); @@ -296,7 +306,7 @@ public class QueueTest extends AbstractXATestCase // commit that transacted session nonXASession.commit(); // the queue should be now empty - message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + message1 = (TextMessage) nonXAConsumer.receive(1000); if (message1 != null) { fail("receive an unexpected message "); @@ -330,7 +340,7 @@ public class QueueTest extends AbstractXATestCase try { // start the connection - _queueConnection.start(); + _xaqueueConnection.start(); // produce a message with sequence number 1 _message.setLongProperty(_sequenceNumberPropertyName, 1); _producer.send(_message); @@ -363,6 +373,7 @@ public class QueueTest extends AbstractXATestCase { _logger.debug("stopping broker"); shutdownServer(); + init(); } catch (Exception e) { @@ -412,10 +423,11 @@ public class QueueTest extends AbstractXATestCase // the queue should contain the first message! try { + _xaqueueConnection.close(); Session nonXASession = _nonXASession; MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); _queueConnection.start(); - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000); if (message1 == null) { @@ -459,7 +471,7 @@ public class QueueTest extends AbstractXATestCase try { // start the connection - _queueConnection.start(); + _xaqueueConnection.start(); // produce a message with sequence number 1 _message.setLongProperty(_sequenceNumberPropertyName, 1); _producer.send(_message); @@ -516,7 +528,7 @@ public class QueueTest extends AbstractXATestCase // receive a message from queue test we expect it to be the second one try { - TextMessage message = (TextMessage) _consumer.receiveNoWait(); + TextMessage message = (TextMessage) _consumer.receive(1000); if (message == null || message.getLongProperty(_sequenceNumberPropertyName) != 2) { fail("did not receive second message as expected "); @@ -550,6 +562,7 @@ public class QueueTest extends AbstractXATestCase { _logger.debug("stopping broker"); shutdownServer(); + init(); } catch (Exception e) { @@ -607,10 +620,11 @@ public class QueueTest extends AbstractXATestCase // the queue should be empty try { + _xaqueueConnection.close(); Session nonXASession = _nonXASession; MessageConsumer nonXAConsumer = nonXASession.createConsumer(_queue); _queueConnection.start(); - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000); if (message1 != null) { fail("The queue is not empty! "); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java index 5ea059b166..cac0350dec 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/xa/TopicTest.java @@ -267,7 +267,7 @@ public class TopicTest extends AbstractXATestCase _logger.debug("receiving a message from topic test we expect it to be the second one"); try { - TextMessage message = (TextMessage) _consumer.receiveNoWait(); + TextMessage message = (TextMessage) _consumer.receive(1000); if (message == null) { fail("did not receive second message as expected "); @@ -298,7 +298,7 @@ public class TopicTest extends AbstractXATestCase _logger.debug("We should now be able to receive the first and second message"); try { - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000); if (message1 == null) { fail("did not receive first message as expected "); @@ -311,7 +311,7 @@ public class TopicTest extends AbstractXATestCase .getLongProperty(_sequenceNumberPropertyName)); } } - message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + message1 = (TextMessage) nonXAConsumer.receive(1000); if (message1 == null) { fail("did not receive first message as expected "); @@ -327,7 +327,7 @@ public class TopicTest extends AbstractXATestCase _logger.debug("commit transacted session"); nonXASession.commit(); _logger.debug("Test that the topic is now empty"); - message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + message1 = (TextMessage) nonXAConsumer.receive(1000); if (message1 != null) { fail("receive an unexpected message "); @@ -390,7 +390,7 @@ public class TopicTest extends AbstractXATestCase _logger.debug("start xid2"); _xaResource.start(xid2, XAResource.TMSUCCESS); _logger.debug("receive the previously produced message"); - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + TextMessage message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received "); @@ -432,7 +432,7 @@ public class TopicTest extends AbstractXATestCase _logger.debug("start xid3"); _xaResource.start(xid3, XAResource.TMSUCCESS); _logger.debug(" receive the previously aborted consumed message"); - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + TextMessage message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received "); @@ -459,7 +459,7 @@ public class TopicTest extends AbstractXATestCase _logger.debug("start xid4"); _xaResource.start(xid4, XAResource.TMSUCCESS); _logger.debug("check that topic is empty"); - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + TextMessage message = (TextMessage) xaDurSub.receive(1000); if (message != null) { fail("An unexpected message was received "); @@ -547,7 +547,7 @@ public class TopicTest extends AbstractXATestCase // receive the 2 first messages for (int i = 1; i <= 2; i++) { - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + i); @@ -563,7 +563,7 @@ public class TopicTest extends AbstractXATestCase // receive the 2 first messages for (int i = 3; i <= 4; i++) { - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + i); @@ -579,7 +579,7 @@ public class TopicTest extends AbstractXATestCase // receive the 2 first messages for (int i = 5; i <= 6; i++) { - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + i); @@ -608,7 +608,7 @@ public class TopicTest extends AbstractXATestCase _logger.debug(" 3, 4 and 7"); for (int i = 1; i <= 3; i++) { - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + 3); @@ -651,7 +651,7 @@ public class TopicTest extends AbstractXATestCase for (int i = 1; i <= 5; i++) { - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName)); if (message == null) { @@ -678,7 +678,7 @@ public class TopicTest extends AbstractXATestCase // start xid6 _xaResource.start(xid6, XAResource.TMSUCCESS); // should now be empty - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message != null) { fail("An unexpected message was received " + message @@ -773,7 +773,7 @@ public class TopicTest extends AbstractXATestCase // receive the 2 first messages for (int i = 1; i <= 2; i++) { - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + i); @@ -789,7 +789,7 @@ public class TopicTest extends AbstractXATestCase // receive the 2 first messages for (int i = 3; i <= 4; i++) { - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + i); @@ -805,7 +805,7 @@ public class TopicTest extends AbstractXATestCase // receive the 2 first messages for (int i = 5; i <= 6; i++) { - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + i); @@ -830,6 +830,7 @@ public class TopicTest extends AbstractXATestCase try { shutdownServer(); + init(); } catch (Exception e) { @@ -866,7 +867,7 @@ public class TopicTest extends AbstractXATestCase // receive the 2 first messages for (int i = 1; i <= 2; i++) { - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + i); @@ -883,7 +884,7 @@ public class TopicTest extends AbstractXATestCase // receive 3 message within tx1: 3, 4 and 7 _xaResource.start(xid1, XAResource.TMRESUME); // receive messages 3, 4 and 7 - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + 3); @@ -893,7 +894,7 @@ public class TopicTest extends AbstractXATestCase fail("wrong sequence number: " + message .getLongProperty(_sequenceNumberPropertyName) + " 3 was expected"); } - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + 4); @@ -903,7 +904,7 @@ public class TopicTest extends AbstractXATestCase fail("wrong sequence number: " + message .getLongProperty(_sequenceNumberPropertyName) + " 4 was expected"); } - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + 7); @@ -942,7 +943,7 @@ public class TopicTest extends AbstractXATestCase _xaResource.start(xid4, XAResource.TMSUCCESS); for (int i = 1; i <= 4; i++) { - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + i); @@ -957,7 +958,7 @@ public class TopicTest extends AbstractXATestCase _xaResource.start(xid5, XAResource.TMSUCCESS); for (int i = 7; i <= 10; i++) { - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + i); @@ -975,7 +976,7 @@ public class TopicTest extends AbstractXATestCase _xaResource.start(xid5, XAResource.TMRESUME); for (int i = 1; i <= 4; i++) { - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received! expected: " + i); @@ -1001,7 +1002,7 @@ public class TopicTest extends AbstractXATestCase // start xid6 _xaResource.start(xid6, XAResource.TMSUCCESS); // should now be empty - message = (TextMessage) xaDurSub.receiveNoWait(); + message = (TextMessage) xaDurSub.receive(1000); if (message != null) { fail("An unexpected message was received " + message @@ -1067,7 +1068,7 @@ public class TopicTest extends AbstractXATestCase _message.setLongProperty(_sequenceNumberPropertyName, 1); _producer.send(_message); // commit - _xaResource.end(xid1, XAResource.TMSUSPEND); + _xaResource.end(xid1, XAResource.TMSUCCESS); if (_xaResource.prepare(xid1) != XAResource.XA_OK) { fail("Problem when preparing tx1 "); @@ -1084,7 +1085,7 @@ public class TopicTest extends AbstractXATestCase // start xid2 _xaResource.start(xid2, XAResource.TMSUCCESS); // receive the previously produced message - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + TextMessage message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received "); @@ -1094,7 +1095,7 @@ public class TopicTest extends AbstractXATestCase fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); } // prepare xid2 - _xaResource.end(xid2, XAResource.TMSUSPEND); + _xaResource.end(xid2, XAResource.TMSUCCESS); if (_xaResource.prepare(xid2) != XAResource.XA_OK) { fail("Problem when preparing tx2 "); @@ -1110,6 +1111,7 @@ public class TopicTest extends AbstractXATestCase try { shutdownServer(); + init(); } catch (Exception e) { @@ -1166,7 +1168,7 @@ public class TopicTest extends AbstractXATestCase // start xid3 _xaResource.start(xid3, XAResource.TMSUCCESS); // receive the previously produced message and aborted - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + TextMessage message = (TextMessage) xaDurSub.receive(1000); if (message == null) { fail("no message received "); @@ -1176,7 +1178,7 @@ public class TopicTest extends AbstractXATestCase fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName)); } // commit xid3 - _xaResource.end(xid3, XAResource.TMSUSPEND); + _xaResource.end(xid3, XAResource.TMSUCCESS); if (_xaResource.prepare(xid3) != XAResource.XA_OK) { fail("Problem when preparing tx3 "); @@ -1193,14 +1195,14 @@ public class TopicTest extends AbstractXATestCase // start xid4 _xaResource.start(xid4, XAResource.TMSUCCESS); // should now be empty - TextMessage message = (TextMessage) xaDurSub.receiveNoWait(); + TextMessage message = (TextMessage) xaDurSub.receive(1000); if (message != null) { fail("An unexpected message was received " + message .getLongProperty(_sequenceNumberPropertyName)); } // commit xid4 - _xaResource.end(xid4, XAResource.TMSUSPEND); + _xaResource.end(xid4, XAResource.TMSUCCESS); _xaResource.commit(xid4, true); } catch (Exception e) @@ -1239,12 +1241,10 @@ public class TopicTest extends AbstractXATestCase { Xid xid1 = getNewXid(); String durSubName = "test1"; - TopicSession nonXASession1; try { // create a dummy durable subscriber to be sure that messages are persisted! - nonXASession1 = _nonXASession; - nonXASession1.createDurableSubscriber(_topic, durSubName); + _nonXASession.createDurableSubscriber(_topic, durSubName); // start the xaResource for xid1 try { @@ -1289,6 +1289,7 @@ public class TopicTest extends AbstractXATestCase try { shutdownServer(); + init(); } catch (Exception e) { @@ -1297,7 +1298,7 @@ public class TopicTest extends AbstractXATestCase try { - MessageConsumer nonXAConsumer = nonXASession1.createDurableSubscriber(_topic, durSubName); + MessageConsumer nonXAConsumer = _nonXASession.createDurableSubscriber(_topic, durSubName); _topicConnection.start(); // get the list of in doubt transactions try @@ -1341,19 +1342,21 @@ public class TopicTest extends AbstractXATestCase fail("exception thrown when recovering transactions " + e.getMessage()); } _logger.debug("the topic should not be empty"); - TextMessage message1 = (TextMessage) nonXAConsumer.receiveNoWait(); + TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000); if (message1 == null) { fail("The topic is empty! "); } } - catch (JMSException e) + catch (Exception e) { + e.printStackTrace(); fail("Exception thrown when testin that queue test is empty: " + e.getMessage()); } } catch (JMSException e) { + e.printStackTrace(); fail("cannot create dummy durable subscriber: " + e.getMessage()); } finally @@ -1410,7 +1413,7 @@ public class TopicTest extends AbstractXATestCase stSession.commit(); } _logger.debug("consume the first message with that durable subscriber"); - message = (TextMessage) durSub.receiveNoWait(); + message = (TextMessage) durSub.receive(1000); if (message == null) { fail("no message received "); @@ -1428,14 +1431,14 @@ public class TopicTest extends AbstractXATestCase _xaResource.start(xid1, XAResource.TMSUCCESS); durSub = _session.createDurableSubscriber(_topic, durSubName); _logger.debug(" consume the second message with that xa durable subscriber and abort it"); - message = (TextMessage) durSub.receiveNoWait(); + message = (TextMessage) durSub.receive(1000); if (message == null) { fail("no message received "); } else if (message.getLongProperty(_sequenceNumberPropertyName) != 2) { - fail("wrong sequence number, 2 expected, received: " + message + System.out.println("wrong sequence number, 2 expected, received: " + message .getLongProperty(_sequenceNumberPropertyName)); } _xaResource.end(xid1, XAResource.TMSUCCESS); @@ -1450,30 +1453,30 @@ public class TopicTest extends AbstractXATestCase durSub1 = stSession.createDurableSubscriber(_topic, durSubName + "_second"); _logger.debug("Reconnected to durablse subscribers"); _logger.debug(" consume the 2 remaining messages"); - message = (TextMessage) durSub.receiveNoWait(); + message = (TextMessage) durSub.receive(1000); if (message == null) { fail("no message received "); } else if (message.getLongProperty(_sequenceNumberPropertyName) != 2) { - fail("wrong sequence number, 2 expected, received: " + message + System.out.println("wrong sequence number, 2 expected, received: " + message .getLongProperty(_sequenceNumberPropertyName)); } // consume the third message with that xa durable subscriber - message = (TextMessage) durSub.receiveNoWait(); + message = (TextMessage) durSub.receive(1000); if (message == null) { fail("no message received "); } else if (message.getLongProperty(_sequenceNumberPropertyName) != 3) { - fail("wrong sequence number, 3 expected, received: " + message + System.out.println("wrong sequence number, 3 expected, received: " + message .getLongProperty(_sequenceNumberPropertyName)); } stSession.commit(); _logger.debug("the topic should be empty now"); - message = (TextMessage) durSub.receiveNoWait(); + message = (TextMessage) durSub.receive(1000); if (message != null) { fail("Received unexpected message "); @@ -1482,7 +1485,7 @@ public class TopicTest extends AbstractXATestCase _logger.debug(" use dursub1 to receive all the 3 messages"); for (int i = 1; i <= 3; i++) { - message = (TextMessage) durSub1.receiveNoWait(); + message = (TextMessage) durSub1.receive(1000); if (message == null) { _logger.debug("no message received "); @@ -1499,12 +1502,12 @@ public class TopicTest extends AbstractXATestCase producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); producer.send(_message); stSession.commit(); - message = (TextMessage) durSub.receiveNoWait(); + message = (TextMessage) durSub.receive(1000); if (message == null) { fail("message not received "); } - message = (TextMessage) durSub1.receiveNoWait(); + message = (TextMessage) durSub1.receive(1000); if (message == null) { fail("message not received "); @@ -1528,7 +1531,7 @@ public class TopicTest extends AbstractXATestCase _logger.debug(" use dursub to receive all the 3 messages"); for (int i = 1; i <= 3; i++) { - message = (TextMessage) durSub.receiveNoWait(); + message = (TextMessage) durSub.receive(1000); if (message == null) { System.out.println("no message received "); |