diff options
Diffstat (limited to 'qpid/java/systests/src/main/java')
23 files changed, 3320 insertions, 686 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java index 52120019f5..e7975f8d24 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java @@ -78,15 +78,23 @@ public class BrokerStartupTest extends AbstractTestLogging // Add an invalid value _broker += " -l invalid"; - // The release-bin build of the broker uses this log4j configuration - // so set up the broker environment to use it for this test. - // Also include -Dlog4j.debug so we can validate that it picked up this config - setBrokerEnvironment("QPID_OPTS", "-Dlog4j.debug -Dlog4j.configuration=file:" + System.getProperty(QPID_HOME) + "/../broker/src/main/java/log4j.properties"); + // The broker has a built in default log4j configuration set up + // so if the the broker cannot load the -l value it will use default + // use this default. Test that this is correctly loaded, by + // including -Dlog4j.debug so we can validate. + setBrokerEnvironment("QPID_OPTS", "-Dlog4j.debug"); // Disable all client logging so we can test for broker DEBUG only. - Logger.getRootLogger().setLevel(Level.WARN); - Logger.getLogger("qpid.protocol").setLevel(Level.WARN); - Logger.getLogger("org.apache.qpid").setLevel(Level.WARN); + setLoggerLevel(Logger.getRootLogger(), Level.WARN); + setLoggerLevel(Logger.getLogger("qpid.protocol"), Level.WARN); + setLoggerLevel(Logger.getLogger("org.apache.qpid"), Level.WARN); + + // Set the broker to use info level logging, which is the qpid-server + // default. Rather than debug which is the test default. + setBrokerOnlySystemProperty("amqj.server.logging.level", "info"); + // Set the logging defaults to info for this test. + setBrokerOnlySystemProperty("amqj.logging.level", "info"); + setBrokerOnlySystemProperty("root.logging.level", "info"); startBroker(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java new file mode 100644 index 0000000000..f1a1c1a9a8 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java @@ -0,0 +1,330 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.failover; + +import org.apache.mina.common.WriteTimeoutException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.test.utils.FailoverBaseCase; +import org.apache.qpid.AMQConnectionClosedException; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Test case based on user reported error. + * + * Summary: + * A user has reported message loss from their application. On bouncing of + * the broker the 'lost' messages are delivered to the broker. + * + * Note: + * The client was using Spring so that may influence the situation. + * + * Issue: + * The log files show 7 instances of the following which result in 7 + * missing messages. + * + * The client log files show: + * + * The broker log file show: + * + * + * 7 missing messages have delivery tags 5-11. Which says that they are + * sequentially the next message from the broker. + * + * The only way for the 'without a handler' log to occur is if the consumer + * has been removed from the look up table of the dispatcher. + * And the only way for the 'null message' log to occur on the broker is is + * if the message does not exist in the unacked-map + * + * The consumer is only removed from the list during session + * closure and failover. + * + * If the session was closed then the broker would requeue the unacked + * messages so the potential exists to have an empty map but the broker + * will not send a message out after the unacked map has been cleared. + * + * When failover occurs the _consumer map is cleared and the consumers are + * resubscribed. This is down without first stopping any existing + * dispatcher so there exists the potential to receive a message after + * the _consumer map has been cleared which is how the 'without a handler' + * log statement occurs. + * + * Scenario: + * + * Looking over logs the sequence that best fits the events is as follows: + * - Something causes Mina to be delayed causing the WriteTimoutException. + * - This exception is recevied by AMQProtocolHandler#exceptionCaught + * - As the WriteTimeoutException is an IOException this will cause + * sessionClosed to be called to start failover. + * + This is potentially the issues here. All IOExceptions are treated + * as connection failure events. + * - Failover Runs + * + Failover assumes that the previous connection has been closed. + * + Failover binds the existing objects (AMQConnection/Session) to the + * new connection objects. + * - Everything is reported as being successfully failed over. + * However, what is neglected is that the original connection has not + * been closed. + * + So what occurs is that the broker sends a message to the consumer on + * the original connection, as it was not notified of the client + * failing over. + * As the client failover reuses the original AMQSession and Dispatcher + * the new messages the broker sends to the old consumer arrives at the + * client and is processed by the same AMQSession and Dispatcher. + * However, as the failover process cleared the _consumer map and + * resubscribe the consumers the Dispatcher does not recognise the + * delivery tag and so logs the 'without a handler' message. + * - The Dispatcher then attempts to reject the message, however, + * + The AMQSession/Dispatcher pair have been swapped to using a new Mina + * ProtocolSession as part of the failover process so the reject is + * sent down the second connection. The broker receives the Reject + * request but as the Message was sent on a different connection the + * unacknowledgemap is empty and a 'message is null' log message + * produced. + * + * Test Strategy: + * + * It should be easy to demonstrate if we can send an IOException to + * AMQProtocolHandler#exceptionCaught and then try sending a message. + * + * The current unknowns here are the type of consumers that are in use. + * If it was an exclusive queue(Durable Subscription) then why did the + * resubscribe not fail. + * + * If it was not exclusive then why did the messages not round robin? + */ +public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implements ConnectionListener +{ + private CountDownLatch _failoverOccured = new CountDownLatch(1); + AMQConnection _connection; + Session _session; + Queue _queue; + MessageConsumer _consumer; + + public void setUp() throws Exception + { + super.setUp(); + stopBroker(getFailingPort()); + + } + + /** + * Test Summary: + * + * Create a queue consumer and send 10 messages to the broker. + * + * Consume the first message. + * This will pull the rest into the prefetch + * + * Send an IOException to the MinaProtocolHandler. + * + * This will force failover to occur. + * + * 9 messages would normally be expected but it is expected that none will + * arrive. As they are still in the prefetch of the first session. + * + * To free the messages we need to close all connections. + * - Simply doing connection.close() and retesting will not be enough as + * the original connection's IO layer will still exist and is nolonger + * connected to the connection object as a result of failover. + * + * - Test will need to retain a reference to the original connection IO so + * that it can be closed releasing the messages to validate that the + * messages have indeed been 'lost' on that sesssion. + */ + public void test() throws Exception + { + initialiseConnection(); + + // Create Producer + // Send 10 messages + List<Message> messages = sendNumberedBytesMessage(_session, _queue, 10); + + // Consume first messasge + Message received = _consumer.receive(2000); + + // Verify received messages + assertNotNull("First message not received.", received); + assertEquals("Incorrect message Received", + messages.remove(0).getIntProperty("count"), + received.getIntProperty("count")); + + // Allow ack to be sent to broker, by performing a synchronous command + // along the session. +// _session.createConsumer(_session.createTemporaryQueue()).close(); + + //Retain IO Layer + AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession(); + + // Send IO Exception - causing failover + _connection.getProtocolHandler(). + exception(new WriteTimeoutException("WriteTimeoutException to cause failover.")); + + // Verify Failover occured through ConnectionListener + assertTrue("Failover did not occur", + _failoverOccured.await(4000, TimeUnit.MILLISECONDS)); + + //Verify new protocolSession is not the same as the original + assertNotSame("Protocol Session has not changed", + protocolSession, + _connection.getProtocolHandler().getProtocolSession()); + + /***********************************/ + // This verifies that the bug has been resolved + + // Attempt to consume again. Expect 9 messages + for (int count = 1; count < 10; count++) + { + received = _consumer.receive(2000); + assertNotNull("Expected message not received:" + count, received); + assertEquals(messages.remove(0).getIntProperty("count"), + received.getIntProperty("count")); + } + + //Verify there are no more messages + received = _consumer.receive(1000); + assertNull("Message receieved when there should be none:" + received, + received); + +// /***********************************/ +// // This verifies that the bug exists +// +// // Attempt to consume remaining 9 messages.. Expecting NONE. +// // receiving just one message should fail so no need to fail 9 times +// received = _consumer.receive(1000); +// assertNull("Message receieved when it should be null:" + received, received); +// +//// //Close the Connection which you would assume would free the messages +//// _connection.close(); +//// +//// // Reconnect +//// initialiseConnection(); +//// +//// // We should still be unable to receive messages +//// received = _consumer.receive(1000); +//// assertNull("Message receieved when it should be null:" + received, received); +//// +//// _connection.close(); +// +// // Close original IO layer. Expecting messages to be released +// protocolSession.closeProtocolSession(); +// +// // Reconnect and all should be good. +//// initialiseConnection(); +// +// // Attempt to consume again. Expect 9 messages +// for (int count = 1; count < 10; count++) +// { +// received = _consumer.receive(2000); +// assertNotNull("Expected message not received:" + count, received); +// assertEquals(messages.remove(0).getIntProperty("count"), +// received.getIntProperty("count")); +// } +// +// //Verify there are no more messages +// received = _consumer.receive(1000); +// assertNull("Message receieved when there should be none:" + received, +// received); + } + + private void initialiseConnection() + throws Exception + { + //Create Connection + _connection = (AMQConnection) getConnection(); + _connection.setConnectionListener(this); + + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _queue = _session.createQueue(getTestQueueName()); + + // Create Consumer + _consumer = _session.createConsumer(_queue); + + //Start connection + _connection.start(); + } + + /** QpidTestCase back port to this release */ + + // modified from QTC as sendMessage is not testable. + // - should be renamed sendBlankBytesMessage + // - should be renamed sendNumberedBytesMessage + public List<Message> sendNumberedBytesMessage(Session session, Destination destination, + int count) throws Exception + { + List<Message> messages = new ArrayList<Message>(count); + + MessageProducer producer = session.createProducer(destination); + + for (int i = 0; i < count; i++) + { + Message next = session.createMessage(); + + next.setIntProperty("count", i); + + producer.send(next); + + messages.add(next); + } + + producer.close(); + return messages; + } + + public void bytesSent(long count) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + //Allow failover to occur + return true; + } + + public boolean preResubscribe() + { + //Allow failover to occur + return true; + } + + public void failoverComplete() + { + _failoverOccured.countDown(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java index 4f50aba61d..b00a71315e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java @@ -160,7 +160,7 @@ public class BrokerLoggingTest extends AbstractTestLogging // Set the broker.ready string to check for the _log4j default that // is still present on standard out. - System.setProperty(BROKER_READY, "Qpid Broker Ready"); + setTestClientSystemProperty(BROKER_READY, "Qpid Broker Ready"); startBroker(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java index dfb5cde247..83f0f87bc5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java @@ -63,7 +63,6 @@ import java.util.concurrent.TimeUnit; */ public class DeepQueueConsumeWithSelector extends QpidTestCase implements MessageListener { - private static final String INDEX = "index"; private static final int MESSAGE_COUNT = 10000; private static final int BATCH_SIZE = MESSAGE_COUNT / 10; @@ -129,9 +128,7 @@ public class DeepQueueConsumeWithSelector extends QpidTestCase implements Messag @Override public Message createNextMessage(Session session, int msgCount) throws JMSException { - Message message = session.createTextMessage("Message :" + msgCount); - - message.setIntProperty(INDEX, msgCount); + Message message = super.createNextMessage(session,msgCount); if ((msgCount % BATCH_SIZE) == 0 ) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java new file mode 100644 index 0000000000..f220760511 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -0,0 +1,393 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.logging.AbstractTestLogging; +import org.apache.qpid.framing.AMQShortString; + +import javax.jms.*; +import javax.naming.NamingException; +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.io.IOException; + +public class ProducerFlowControlTest extends AbstractTestLogging +{ + private static final int TIMEOUT = 1500; + + + private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class); + + protected final String QUEUE = "ProducerFlowControl"; + + private static final int MSG_COUNT = 50; + + private Connection producerConnection; + private MessageProducer producer; + private Session producerSession; + private Queue queue; + private Connection consumerConnection; + private Session consumerSession; + + + private MessageConsumer consumer; + private final AtomicInteger _sentMessages = new AtomicInteger(); + + public void setUp() throws Exception + { + super.setUp(); + + _monitor.reset(); + + producerConnection = getConnection(); + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + producerConnection.start(); + + consumerConnection = getConnection(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + } + + public void tearDown() throws Exception + { + producerConnection.close(); + consumerConnection.close(); + super.tearDown(); + } + + public void testCapacityExceededCausesBlock() + throws JMSException, NamingException, AMQException, InterruptedException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + + assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get()); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message not sent after two messages received", 5, _sentMessages.get()); + + } + + public void testBrokerLogMessages() + throws JMSException, NamingException, AMQException, InterruptedException, IOException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + List<String> results = _monitor.findMatches("QUE-1003"); + + assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + while(consumer.receive(1000) != null); + + results = _monitor.findMatches("QUE-1004"); + + assertEquals("Did not find correct number of QUE_1004 queue underfull messages", 1, results.size()); + + + + } + + + public void testClientLogMessages() + throws JMSException, NamingException, AMQException, InterruptedException, IOException + { + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); + setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000"); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) session).declareAndBind((AMQDestination)queue); + producer = session.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(10000); + List<String> results = _monitor.findMatches("Message send delayed by"); + assertEquals("Incorrect number of delay messages logged by client",3,results.size()); + results = _monitor.findMatches("Message send failed due to timeout waiting on broker enforced flow control"); + assertEquals("Incorrect number of send failure messages logged by client",1,results.size()); + + + + } + + + public void testFlowControlOnCapacityResumeEqual() + throws JMSException, NamingException, AMQException, InterruptedException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",1000); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + _sentMessages.set(0); + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + + assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get()); + + + } + + + public void testFlowControlSoak() + throws Exception, NamingException, AMQException, InterruptedException + { + _sentMessages.set(0); + final int numProducers = 10; + final int numMessages = 100; + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",6000); + arguments.put("x-qpid-flow-resume-capacity",3000); + + ((AMQSession) consumerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments); + + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue); + consumerConnection.start(); + + Connection[] producers = new Connection[numProducers]; + for(int i = 0 ; i < numProducers; i ++) + { + + producers[i] = getConnection(); + producers[i].start(); + Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer myproducer = session.createProducer(queue); + MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L); + } + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + for(int j = 0; j < numProducers * numMessages; j++) + { + + Message msg = consumer.receive(5000); + Thread.sleep(50L); + assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg); + + } + + + + Message msg = consumer.receive(500); + assertNull("extra message received", msg); + + + for(int i = 0; i < numProducers; i++) + { + producers[i].close(); + } + + } + + + + public void testSendTimeout() + throws JMSException, NamingException, AMQException, InterruptedException + { + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) session).declareAndBind((AMQDestination)queue); + producer = session.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(10000); + + Exception e = sender.getException(); + + assertNotNull("No timeout exception on sending", e); + + } + + private MessageSender sendMessagesAsync(final MessageProducer producer, + final Session producerSession, + final int numMessages, + long sleepPeriod) + { + MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod); + new Thread(sender).start(); + return sender; + } + + private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod) + throws JMSException + { + + for (int msg = 0; msg < numMessages; msg++) + { + producer.send(nextMessage(msg, producerSession)); + _sentMessages.incrementAndGet(); + + try + { + Thread.sleep(sleepPeriod); + } + catch (InterruptedException e) + { + } + } + } + + private static final byte[] BYTE_300 = new byte[300]; + + + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + BytesMessage send = producerSession.createBytesMessage(); + send.writeBytes(BYTE_300); + send.setIntProperty("msg", msg); + + return send; + } + + + private class MessageSender implements Runnable + { + private final MessageProducer _producer; + private final Session _producerSession; + private final int _numMessages; + + + + private JMSException _exception; + private long _sleepPeriod; + + public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod) + { + _producer = producer; + _producerSession = producerSession; + _numMessages = numMessages; + _sleepPeriod = sleepPeriod; + } + + public void run() + { + try + { + sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod); + } + catch (JMSException e) + { + _exception = e; + } + } + + public JMSException getException() + { + return _exception; + } + } +}
\ No newline at end of file diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java index bb7b5efc75..3e5470d5cb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java @@ -21,27 +21,34 @@ package org.apache.qpid.server.security.acl; - -import junit.framework.TestCase; - -import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.Logger; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.client.*; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.AMQConnectionFailureException; +import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.AMQException; -import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.AMQConnectionFailureException; +import org.apache.qpid.client.AMQAuthenticationException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; + -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; import javax.naming.NamingException; - import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -54,6 +61,14 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void setUp() throws Exception { + //Performing setUp here would result in a broker with the default ACL test config + + //Each test now calls the private setUpACLTest to allow them to make + //individual customisations to the base ACL settings + } + + private void setUpACLTest() throws Exception + { final String QPID_HOME = System.getProperty("QPID_HOME"); if (QPID_HOME == null) @@ -73,8 +88,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E return "amqp://" + username + ":" + password + "@clientid/test?brokerlist='" + getBroker() + "?retries='0''"; } - public void testAccessAuthorized() throws AMQException, URLSyntaxException + public void testAccessAuthorized() throws AMQException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -96,6 +113,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testAccessNoRights() throws Exception { + setUpACLTest(); + try { Connection conn = getConnection("guest", "guest"); @@ -120,8 +139,40 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E } } - public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException + public void testGuestConsumeWithCreateRightsAndWithoutConsumeRights() throws NamingException, ConfigurationException, IOException, Exception + { + //Customise the ACL config to give the guest user some create (could be any, non-consume) rights to + //force creation of a PrincipalPermissions instance to perform the consume rights check against. + setConfigurationProperty("virtualhosts.virtualhost.test.security.access_control_list.create.queues.queue.users.user", "guest"); + + setUpACLTest(); + + try + { + Connection conn = getConnection("guest", "guest"); + + Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + conn.start(); + + sesh.createConsumer(sesh.createQueue("example.RequestQueue")); + + conn.close(); + } + catch (JMSException e) + { + Throwable cause = e.getLinkedException(); + + assertNotNull("There was no liked exception", cause); + assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass()); + assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode()); + } + } + + public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -142,6 +193,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testClientConsumeFromNamedQueueInvalid() throws NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -167,8 +220,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E } } - public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException + public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -191,6 +246,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testClientCreateNamedQueue() throws NamingException, JMSException, AMQException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -212,8 +269,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E } } - public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException + public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -239,8 +298,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E } } - public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException + public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -271,6 +332,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -311,8 +374,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E assertTrue("Did not get AMQAuthenticationException thrown", foundCorrectException); } - public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException + public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("server", "guest"); @@ -333,6 +398,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testServerConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException, NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("client", "guest"); @@ -358,6 +425,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testServerConsumeFromTemporaryQueue() throws AMQException, URLSyntaxException, NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("server", "guest"); @@ -391,8 +460,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E return (Connection) connection; } - public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException + public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("server", "guest"); @@ -414,6 +485,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testServerCreateNamedQueueInvalid() throws JMSException, URLSyntaxException, AMQException, NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("server", "guest"); @@ -436,6 +509,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testServerCreateTemporaryQueueInvalid() throws NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("server", "guest"); @@ -461,6 +536,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception { + setUpACLTest(); + Connection connection = null; try { @@ -492,6 +569,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E */ public void testServerPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception { + setUpACLTest(); + //Set up the Server Connection serverConnection = getConnection("server", "guest"); @@ -572,6 +651,8 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E public void testServerPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception { + setUpACLTest(); + try { Connection conn = getConnection("server", "guest"); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index 62b54d3086..f9cf48a2b1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -61,7 +61,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase setupSession(); - _queue = _clientSession.createQueue(getName()+System.currentTimeMillis()); + _queue = _clientSession.createQueue(getTestQueueName()); _clientSession.createConsumer(_queue).close(); //Ensure there are no messages on the queue to start with. @@ -497,7 +497,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase if (msgCount == failPoint) { - failBroker(); + failBroker(getFailingPort()); } } @@ -529,7 +529,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase sendMessages("connection2", messages); } - failBroker(); + failBroker(getFailingPort()); checkQueueDepth(messages); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java index 39e2b892a9..ff766c907d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java @@ -22,64 +22,172 @@ package org.apache.qpid.test.client; import org.apache.qpid.test.utils.*; import javax.jms.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import junit.framework.ComparisonFailure; +import junit.framework.AssertionFailedError; /** - * RollbackOrderTest + * RollbackOrderTest, QPID-1864, QPID-1871 + * + * Description: + * + * The problem that this test is exposing is that the dispatcher used to be capable + * of holding on to a message when stopped. This ment that when the rollback was + * called and the dispatcher stopped it may have hold of a message. So after all + * the local queues(preDeliveryQueue, SynchronousQueue, PostDeliveryTagQueue) + * have been cleared the client still had a single message, the one the + * dispatcher was holding on to. + * + * As a result the TxRollback operation would run and then release the dispatcher. + * Whilst the dispatcher would then proceed to reject the message it was holiding + * the Broker would already have resent that message so the rejection would silently + * fail. + * + * And the client would receieve that single message 'early', depending on the + * number of messages already recevied when rollback was called. + * + * + * Aims: + * + * The tests puts 50 messages on to the queue. + * + * The test then tries to cause the dispatcher to stop whilst it is in the process + * of moving a message from the preDeliveryQueue to a consumers sychronousQueue. + * + * To exercise this path we have 50 message flowing to the client to give the + * dispatcher a bit of work to do moving messages. + * + * Then we loop - 10 times + * - Validating that the first message received is always message 1. + * - Receive a few more so that there are a few messages to reject. + * - call rollback, to try and catch the dispatcher mid process. + * + * Outcome: + * + * The hope is that we catch the dispatcher mid process and cause a BasicReject + * to fail. Which will be indicated in the log but will also cause that failed + * rejected message to be the next to be delivered which will not be message 1 + * as expected. + * + * We are testing a race condition here but we can check through the log file if + * the race condition occured. However, performing that check will only validate + * the problem exists and will not be suitable as part of a system test. * */ - public class RollbackOrderTest extends QpidTestCase { - private Connection conn; - private Queue queue; - private Session ssn; - private MessageProducer prod; - private MessageConsumer cons; + private Connection _connection; + private Queue _queue; + private Session _session; + private MessageConsumer _consumer; @Override public void setUp() throws Exception { super.setUp(); - conn = getConnection(); - conn.start(); - ssn = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); - queue = ssn.createQueue("rollback-order-test-queue"); - prod = ssn.createProducer(queue); - cons = ssn.createConsumer(queue); - for (int i = 0; i < 5; i++) - { - TextMessage msg = ssn.createTextMessage("message " + (i+1)); - prod.send(msg); - } - ssn.commit(); + _connection = getConnection(); + + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + _queue = _session.createQueue(getTestQueueName()); + _consumer = _session.createConsumer(_queue); + + //Send more messages so it is more likely that the dispatcher is + // processing on rollback. + sendMessage(_session, _queue, 50); + _session.commit(); + } public void testOrderingAfterRollback() throws Exception { - for (int i = 0; i < 10; i++) + //Start the session now so we + _connection.start(); + + for (int i = 0; i < 20; i++) { - TextMessage msg = (TextMessage) cons.receive(); - assertEquals("message 1", msg.getText()); - ssn.rollback(); + Message msg = _consumer.receive(); + assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX)); + + // Pull additional messages through so we have some reject work to do + for (int m=0; m < 5 ; m++) + { + _consumer.receive(); + } + + System.err.println("ROT-Rollback"); + _logger.warn("ROT-Rollback"); + _session.rollback(); } } - @Override public void tearDown() throws Exception + public void testOrderingAfterRollbackOnMessage() throws Exception { - while (true) + final CountDownLatch count= new CountDownLatch(20); + final Exception exceptions[] = new Exception[20]; + final AtomicBoolean failed = new AtomicBoolean(false); + + _consumer.setMessageListener(new MessageListener() { - Message msg = cons.receiveNoWait(); - if (msg == null) + + public void onMessage(Message message) { - break; + + Message msg = message; + try + { + count.countDown(); + assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX)); + + _session.rollback(); + } + catch (JMSException e) + { + System.out.println("Error:" + e.getMessage()); + exceptions[(int)count.getCount()] = e; + } + catch (AssertionFailedError cf) + { + // End Test if Equality test fails + while (count.getCount() != 0) + { + count.countDown(); + } + + System.out.println("Error:" + cf.getMessage()); + System.err.println(cf.getMessage()); + cf.printStackTrace(); + failed.set(true); + } } - else + }); + //Start the session now so we + _connection.start(); + + count.await(); + + for (Exception e : exceptions) + { + if (e != null) { - msg.acknowledge(); + System.err.println(e.getMessage()); + e.printStackTrace(); + failed.set(true); } } - ssn.commit(); + +// _consumer.close(); + _connection.close(); + + assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get()); + } + + @Override public void tearDown() throws Exception + { + + drainQueue(_queue); + super.tearDown(); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java index dfc3bb7b42..c307176f3f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java @@ -37,7 +37,6 @@ import javax.naming.NamingException; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; @@ -58,13 +57,12 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener private Session consumerSession; private MessageConsumer consumer; - private static int usedBrokers = 0; private CountDownLatch failoverComplete; - private static final long DEFAULT_FAILOVER_TIME = 10000L; private boolean CLUSTERED = Boolean.getBoolean("profile.clustered"); private int seed; private Random rand; - + private int _currentPort = getFailingPort(); + @Override protected void setUp() throws Exception { @@ -227,7 +225,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener _logger.info("Failing over"); - causeFailure(DEFAULT_FAILOVER_TIME); + causeFailure(_currentPort, DEFAULT_FAILOVER_TIME); // Check that you produce and consume the rest of messages. _logger.debug("=================="); @@ -242,10 +240,10 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener _logger.debug("=================="); } - private void causeFailure(long delay) + private void causeFailure(int port, long delay) { - failBroker(); + failBroker(port); _logger.info("Awaiting Failover completion"); try @@ -268,7 +266,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener Message msg = consumer.receive(); assertNotNull("Expected msgs not received", msg); - causeFailure(DEFAULT_FAILOVER_TIME); + causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME); Exception failure = null; try @@ -314,7 +312,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000; //Fail the first broker - causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME); + causeFailure(getFailingPort(), FAILOVER_DELAY + DEFAULT_FAILOVER_TIME); //Reconnection should occur assertTrue("Failover did not take long enough", System.nanoTime() > failTime); @@ -344,15 +342,15 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener _logger.debug("==================================================================="); runP2PFailover(numMessages, false,false, false); - startBroker(getFailingPort()); + startBroker(_currentPort); if (useAltPort) { - setFailingPort(altPort); + _currentPort = altPort; useAltPort = false; } else { - setFailingPort(stdPort); + _currentPort = stdPort; useAltPort = true; } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java index 5a5e23baa5..a09589b121 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java @@ -1,31 +1,248 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.test.client.message; -import javax.jms.Connection; -import javax.jms.Destination; +import java.util.concurrent.CountDownLatch; + +import javax.jms.DeliveryMode; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import junit.framework.Assert; -import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.test.utils.QpidTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class SelectorTest extends QpidTestCase +public class SelectorTest extends QpidTestCase implements MessageListener { - private static final Logger _logger = Logger.getLogger(SelectorTest.class); + private static final Logger _logger = LoggerFactory.getLogger(SelectorTest.class); - public void testSelectorWithJMSMessageID() throws Exception + private AMQConnection _connection; + private AMQDestination _destination; + private int count; + public String _connectionString = "vm://:1"; + private static final String INVALID_SELECTOR = "Cost LIKE 5"; + CountDownLatch _responseLatch = new CountDownLatch(1); + + private static final String BAD_MATHS_SELECTOR = " 1 % 5"; + + private static final long RECIEVE_TIMEOUT = 1000; + + protected void setUp() throws Exception + { + super.setUp(); + init((AMQConnection) getConnection("guest", "guest")); + } + + private void init(AMQConnection connection) throws JMSException + { + init(connection, new AMQQueue(connection, getTestQueueName(), true)); + } + + private void init(AMQConnection connection, AMQDestination destination) throws JMSException + { + _connection = connection; + _destination = destination; + connection.start(); + } + + public void onMessage(Message message) + { + count++; + _logger.info("Got Message:" + message); + _responseLatch.countDown(); + } + + public void testUsingOnMessage() throws Exception + { + String selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'"; + // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; + + Session session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + // _session.createConsumer(destination).setMessageListener(this); + session.createConsumer(_destination, selector).setMessageListener(this); + + try + { + Message msg = session.createTextMessage("Message"); + msg.setJMSPriority(1); + msg.setIntProperty("Cost", 2); + msg.setStringProperty("property-with-hyphen", "wibble"); + msg.setJMSType("Special"); + + _logger.info("Sending Message:" + msg); + + ((BasicMessageProducer) session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT); + _logger.info("Message sent, waiting for response..."); + + _responseLatch.await(); + + if (count > 0) + { + _logger.info("Got message"); + } + + if (count == 0) + { + fail("Did not get message!"); + // throw new RuntimeException("Did not get message!"); + } + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + System.out.println("SUCCESS!!"); + } + } + catch (InterruptedException e) + { + _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage()); + } + + } + + public void testUnparsableSelectors() throws Exception { - Connection conn = getConnection(); - conn.start(); - Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); + AMQSession session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + boolean caught = false; - Destination dest = session.createQueue("SelectorQueue"); + //Try Creating a Browser + try + { + session.createBrowser(session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + caught = true; + } + assertTrue("No exception thrown!", caught); + caught = false; + + //Try Creating a Consumer + try + { + session.createConsumer(session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + caught = true; + } + assertTrue("No exception thrown!", caught); + caught = false; + + //Try Creating a Receiever + try + { + session.createReceiver(session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + caught = true; + } + assertTrue("No exception thrown!", caught); + caught = false; + + try + { + session.createReceiver(session.createQueue("Ping"), BAD_MATHS_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + caught = true; + } + assertTrue("No exception thrown!", caught); + caught = false; - MessageProducer prod = session.createProducer(dest); - MessageConsumer consumer = session.createConsumer(dest,"JMSMessageID IS NOT NULL"); + } + + public void testRuntimeSelectorError() throws JMSException + { + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(_destination , "testproperty % 5 = 1"); + MessageProducer producer = session.createProducer(_destination); + Message sentMsg = session.createTextMessage(); + + sentMsg.setIntProperty("testproperty", 1); // 1 % 5 + producer.send(sentMsg); + Message recvd = consumer.receive(RECIEVE_TIMEOUT); + assertNotNull(recvd); + + sentMsg.setStringProperty("testproperty", "hello"); // "hello" % 5 makes no sense + producer.send(sentMsg); + try + { + recvd = consumer.receive(RECIEVE_TIMEOUT); + assertNull(recvd); + } + catch (Exception e) + { + + } + assertTrue("Connection should be closed", _connection.isClosed()); + } + + public void testSelectorWithJMSMessageID() throws Exception + { + Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer prod = session.createProducer(_destination); + MessageConsumer consumer = session.createConsumer(_destination,"JMSMessageID IS NOT NULL"); for (int i=0; i<2; i++) { @@ -54,7 +271,7 @@ public class SelectorTest extends QpidTestCase Message msg3 = consumer.receive(1000); Assert.assertNull("Msg3 should be null", msg3); session.commit(); - consumer = session.createConsumer(dest,"JMSMessageID IS NULL"); + consumer = session.createConsumer(_destination,"JMSMessageID IS NULL"); Message msg4 = consumer.receive(1000); Message msg5 = consumer.receive(1000); @@ -62,4 +279,32 @@ public class SelectorTest extends QpidTestCase Assert.assertNotNull("Msg4 should not be null", msg4); Assert.assertNotNull("Msg5 should not be null", msg5); } + + public static void main(String[] argv) throws Exception + { + SelectorTest test = new SelectorTest(); + test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0]; + + try + { + while (true) + { + if (test._connectionString.contains("vm://:1")) + { + test.setUp(); + } + test.testUsingOnMessage(); + + if (test._connectionString.contains("vm://:1")) + { + test.tearDown(); + } + } + } + catch (Exception e) + { + System.err.println(e.getMessage()); + e.printStackTrace(); + } + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java new file mode 100644 index 0000000000..4b45a96c20 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java @@ -0,0 +1,193 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.test.unit.ack; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.FailoverBaseCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +public class Acknowledge2ConsumersTest extends FailoverBaseCase +{ + protected static int NUM_MESSAGES = 100; + protected Connection _con; + protected Queue _queue; + private Session _producerSession; + private Session _consumerSession; + private MessageConsumer _consumerA; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _queue = (Queue) getInitialContext().lookup("queue"); + + //Create Producer put some messages on the queue + _con = getConnection(); + } + + private void init(boolean transacted, int mode) throws JMSException + { + _producerSession = _con.createSession(true, Session.SESSION_TRANSACTED); + _consumerSession = _con.createSession(transacted, mode); + _consumerA = _consumerSession.createConsumer(_queue); + _con.start(); + } + + /** + * Produces Messages that + * + * @param transacted + * @param mode + * + * @throws Exception + */ + private void test2ConsumersAcking(boolean transacted, int mode) throws Exception + { + init(transacted, mode); + + // These should all end up being prefetched by sessionA + sendMessage(_producerSession, _queue, NUM_MESSAGES / 2); + + //Create a second consumer (consumerB) to consume some of the messages + MessageConsumer consumerB = _consumerSession.createConsumer(_queue); + + // These messages should be roundrobined between A and B + sendMessage(_producerSession, _queue, NUM_MESSAGES / 2); + + int count = 0; + //Use consumerB to receive messages it has + Message msg = consumerB.receive(1500); + while (msg != null) + { + if (mode == Session.CLIENT_ACKNOWLEDGE) + { + msg.acknowledge(); + } + count++; + msg = consumerB.receive(1500); + } + if (transacted) + { + _consumerSession.commit(); + } + + // Close the consumers + _consumerA.close(); + consumerB.close(); + + // and close the session to release any prefetched messages. + _consumerSession.close(); + assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count, + ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue)); + + // Clean up messages that may be left on the queue + _consumerSession = _con.createSession(transacted, mode); + _consumerA = _consumerSession.createConsumer(_queue); + msg = _consumerA.receive(1500); + while (msg != null) + { + if (mode == Session.CLIENT_ACKNOWLEDGE) + { + msg.acknowledge(); + } + msg = _consumerA.receive(1500); + } + _consumerA.close(); + if (transacted) + { + _consumerSession.commit(); + } + _consumerSession.close(); + } + + public void test2ConsumersAutoAck() throws Exception + { + test2ConsumersAcking(false, Session.AUTO_ACKNOWLEDGE); + } + + public void test2ConsumersClientAck() throws Exception + { + test2ConsumersAcking(false, Session.CLIENT_ACKNOWLEDGE); + } + + public void test2ConsumersTx() throws Exception + { + test2ConsumersAcking(true, Session.SESSION_TRANSACTED); + } + + + +// +// /** +// * Check that session level acknowledge does correctly ack all previous +// * values. Send 3 messages(0,1,2) then ack 1 and 2. If session ack is +// * working correctly then acking 1 will also ack 0. Acking 2 will not +// * attempt to re-ack 0 and 1. +// * +// * @throws Exception +// */ +// public void testSessionAck() throws Exception +// { +// init(false, Session.CLIENT_ACKNOWLEDGE); +// +// sendMessage(_producerSession, _queue, 3); +// Message msg; +// +// // Drop msg 0 +// _consumerA.receive(RECEIVE_TIMEOUT); +// +// // Take msg 1 +// msg = _consumerA.receive(RECEIVE_TIMEOUT); +// +// assertNotNull("Message 1 not correctly received.", msg); +// assertEquals("Incorrect message received", 1, msg.getIntProperty(INDEX)); +// +// // This should also ack msg 0 +// msg.acknowledge(); +// +// // Take msg 2 +// msg = _consumerA.receive(RECEIVE_TIMEOUT); +// +// assertNotNull("Message 2 not correctly received.", msg); +// assertEquals("Incorrect message received", 2, msg.getIntProperty(INDEX)); +// +// // This should just ack msg 2 +// msg.acknowledge(); +// +// _consumerA.close(); +// _consumerSession.close(); +// +// assertEquals("Queue not empty.", 0, +// ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue)); +// _con.close(); +// +// +// } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java new file mode 100644 index 0000000000..f22a405fc3 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java @@ -0,0 +1,356 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.jms.ConnectionListener; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TransactionRolledBackException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageTest implements ConnectionListener +{ + + protected CountDownLatch _failoverCompleted = new CountDownLatch(1); + private MessageListener _listener = null; + + @Override + public void setUp() throws Exception + { + super.setUp(); + NUM_MESSAGES = 10; + } + + /** + * Override default init to add connectionListener so we can verify that + * failover took place + * + * @param transacted create a transacted session for this test + * @param mode if not transacted what ack mode to use for this test + * + * @throws Exception if a problem occured during test setup. + */ + @Override + public void init(boolean transacted, int mode) throws Exception + { + super.init(transacted, mode); + ((AMQConnection) _connection).setConnectionListener(this); + // Override the listener for the dirtyAck testing. + if (_listener != null) + { + _consumer.setMessageListener(_listener); + } + } + + protected void prepBroker(int count) throws Exception + { + //Stop the connection whilst we repopulate the broker, or the no_ack + // test will drain the msgs before we can check we put the right number + // back on again. +// _connection.stop(); + + Connection connection = getConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + // ensure destination is created. + session.createConsumer(_queue).close(); + + sendMessage(session, _queue, count, NUM_MESSAGES - count, 0); + + if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE) + { + assertEquals("Wrong number of messages on queue", count, + ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); + } + + connection.close(); + +// _connection.start(); + } + + @Override + public void doAcknowlegement(Message msg) throws JMSException + { + //Acknowledge current message + super.doAcknowlegement(msg); + + int msgCount = msg.getIntProperty(INDEX); + + if (msgCount % 2 == 0) + { + failBroker(getFailingPort()); + } + else + { + failBroker(getPort()); + } + + try + { + prepBroker(NUM_MESSAGES - msgCount - 1); + } + catch (Exception e) + { + fail("Unable to prep new broker," + e.getMessage()); + } + + try + { + + if (msgCount % 2 == 0) + { + startBroker(getFailingPort()); + } + else + { + startBroker(getPort()); + } + } + catch (Exception e) + { + fail("Unable to start failover broker," + e.getMessage()); + } + + } + + int msgCount = 0; + boolean cleaned = false; + + class DirtyAckingHandler implements MessageListener + { + /** + * Validate first message but do nothing with it. + * + * Failover + * + * The receive the message again + * + * @param message + */ + public void onMessage(Message message) + { + // Stop processing if we have an error and had to stop running. + if (_receviedAll.getCount() == 0) + { + _logger.debug("Dumping msgs due to error(" + _causeOfFailure.get().getMessage() + "):" + message); + return; + } + + try + { + // Check we have the next message as expected + assertNotNull("Message " + msgCount + " not correctly received.", message); + assertEquals("Incorrect message received", msgCount, message.getIntProperty(INDEX)); + + if (msgCount == 0 && _failoverCompleted.getCount() != 0) + { + // This is the first message we've received so lets fail the broker + + failBroker(getFailingPort()); + + repopulateBroker(); + + _logger.error("Received first msg so failing over"); + + return; + } + + msgCount++; + + // Don't acknowlege the first message after failover so we can commit + // them together + if (msgCount == 1) + { + _logger.error("Received first msg after failover ignoring:" + msgCount); + + // Acknowledge the first message if we are now on the cleaned pass + if (cleaned) + { + _receviedAll.countDown(); + } + + return; + } + + if (_consumerSession.getTransacted()) + { + try + { + _consumerSession.commit(); + if (!cleaned) + { + fail("Session is dirty we should get an TransactionRolledBackException"); + } + } + catch (TransactionRolledBackException trbe) + { + //expected path + } + } + else + { + try + { + message.acknowledge(); + if (!cleaned) + { + fail("Session is dirty we should get an IllegalStateException"); + } + } + catch (javax.jms.IllegalStateException ise) + { + assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage()); + // Recover the sesion and try again. + _consumerSession.recover(); + } + } + + // Acknowledge the last message if we are in a clean state + // this will then trigger test teardown. + if (cleaned) + { + _receviedAll.countDown(); + } + + //Reset message count so we can try again. + msgCount = 0; + cleaned = true; + } + catch (Exception e) + { + // If something goes wrong stop and notifiy main thread. + fail(e); + } + } + } + + /** + * Test that Acking/Committing a message received before failover causes + * an exception at commit/ack time. + * + * Expected behaviour is that in: + * * tx mode commit() throws a transacted RolledBackException + * * client ack mode throws an IllegalStateException + * + * @param transacted is this session trasacted + * @param mode What ack mode should be used if not trasacted + * + * @throws Exception if something goes wrong. + */ + protected void testDirtyAcking(boolean transacted, int mode) throws Exception + { + NUM_MESSAGES = 2; + _listener = new DirtyAckingHandler(); + + super.testAcking(transacted, mode); + } + + public void testDirtyClientAck() throws Exception + { + testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE); + } + + public void testDirtyAckingTransacted() throws Exception + { + testDirtyAcking(true, Session.SESSION_TRANSACTED); + } + + private void repopulateBroker() throws Exception + { + // Repopulate this new broker so we can test what happends after failover + + //Get the connection to the first (main port) broker. + Connection connection = getConnection(); + // Use a transaction to send messages so we can be sure they arrive. + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + // ensure destination is created. + session.createConsumer(_queue).close(); + + sendMessage(session, _queue, NUM_MESSAGES); + + assertEquals("Wrong number of messages on queue", NUM_MESSAGES, + ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); + + connection.close(); + } + + // AMQConnectionListener Interface.. used so we can validate that we + // actually failed over. + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + //Allow failover + return true; + } + + public boolean preResubscribe() + { + //Allow failover + return true; + } + + public void failoverComplete() + { + _failoverCompleted.countDown(); + } + + /** + * Override so we can block until failover has completd + * + * @param port + */ + @Override + public void failBroker(int port) + { + super.failBroker(port); + + try + { + if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS)) + { + // Use an exception so that we use our local fail() that notifies the main thread of failure + throw new Exception("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME); + } + + } + catch (Exception e) + { + // Use an exception so that we use our local fail() that notifies the main thread of failure + fail(e); + } + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java new file mode 100644 index 0000000000..eb36522fac --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java @@ -0,0 +1,306 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.jms.ConnectionListener; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TransactionRolledBackException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * + */ +public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements ConnectionListener +{ + + protected CountDownLatch _failoverCompleted = new CountDownLatch(1); + + @Override + public void setUp() throws Exception + { + super.setUp(); + // This must be even for the test to run correctly. + // Otherwise we will kill the standby broker + // not the one we are connected to. + // The test will still pass but it will not be exactly + // as described. + NUM_MESSAGES = 6; + } + + /** + * Override default init to add connectionListener so we can verify that + * failover took place + * + * @param transacted create a transacted session for this test + * @param mode if not transacted what ack mode to use for this test + * @throws Exception if a problem occured during test setup. + */ + @Override + protected void init(boolean transacted, int mode) throws Exception + { + super.init(transacted, mode); + ((AMQConnection) _connection).setConnectionListener(this); + } + + protected void prepBroker(int count) throws Exception + { + if (count % 2 == 1) + { + failBroker(getFailingPort()); + } + else + { + failBroker(getPort()); + } + + Connection connection = getConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + // ensure destination is created. + session.createConsumer(_queue).close(); + + sendMessage(session, _queue, count, NUM_MESSAGES - count, 0); + + if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE) + { + assertEquals("Wrong number of messages on queue", count, + ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); + } + + connection.close(); + + try + { + if (count % 2 == 1) + { + startBroker(getFailingPort()); + } + else + { + startBroker(getPort()); + } + } + catch (Exception e) + { + fail("Unable to start failover broker," + e.getMessage()); + } + } + + @Override + public void doAcknowlegement(Message msg) throws JMSException + { + //Acknowledge current message + super.doAcknowlegement(msg); + + try + { + prepBroker(NUM_MESSAGES - msg.getIntProperty(INDEX) - 1); + } + catch (Exception e) + { + fail("Unable to prep new broker," + e.getMessage()); + } + + } + + /** + * Test that Acking/Committing a message received before failover causes + * an exception at commit/ack time. + * + * Expected behaviour is that in: + * * tx mode commit() throws a transacted RolledBackException + * * client ack mode throws an IllegalStateException + * + * @param transacted is this session trasacted + * @param mode What ack mode should be used if not trasacted + * + * @throws Exception if something goes wrong. + */ + protected void testDirtyAcking(boolean transacted, int mode) throws Exception + { + NUM_MESSAGES = 2; + //Test Dirty Failover Fails + init(transacted, mode); + + _connection.start(); + + Message msg = _consumer.receive(1500); + + int count = 0; + assertNotNull("Message " + count + " not correctly received.", msg); + assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX)); + + //Don't acknowledge just prep the next broker. Without changing count + // Prep the new broker to have all all the messages so we can validate + // that they can all be correctly received. + try + { + + //Stop the connection so we can validate the number of message count + // on the queue is correct after failover + _connection.stop(); + failBroker(getFailingPort()); + + //Get the connection to the first (main port) broker. + Connection connection = getConnection();//getConnectionFactory("connection1").getConnectionURL()); + // Use a transaction to send messages so we can be sure they arrive. + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + // ensure destination is created. + session.createConsumer(_queue).close(); + + sendMessage(session, _queue, NUM_MESSAGES); + + assertEquals("Wrong number of messages on queue", NUM_MESSAGES, + ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); + + connection.close(); + + //restart connection + _connection.start(); + } + catch (Exception e) + { + fail("Unable to prep new broker," + e.getMessage()); + } + + // Consume the next message - don't check what it is as a normal would + // assume it is msg 1 but as we've fallen over it is msg 0 again. + msg = _consumer.receive(1500); + + if (_consumerSession.getTransacted()) + { + try + { + _consumerSession.commit(); + fail("Session is dirty we should get an TransactionRolledBackException"); + } + catch (TransactionRolledBackException trbe) + { + //expected path + } + } + else + { + try + { + msg.acknowledge(); + fail("Session is dirty we should get an IllegalStateException"); + } + catch (javax.jms.IllegalStateException ise) + { + assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage()); + // Recover the sesion and try again. + _consumerSession.recover(); + } + } + + msg = _consumer.receive(1500); + // Validate we now get the first message back + assertEquals(0, msg.getIntProperty(INDEX)); + + msg = _consumer.receive(1500); + // and the second message + assertEquals(1, msg.getIntProperty(INDEX)); + + // And now verify that we can now commit the clean session + if (_consumerSession.getTransacted()) + { + _consumerSession.commit(); + } + else + { + msg.acknowledge(); + } + + assertEquals("Wrong number of messages on queue", 0, + ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); + } + + public void testDirtyClientAck() throws Exception + { + testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE); + } + + public void testDirtyAckingTransacted() throws Exception + { + testDirtyAcking(true, Session.SESSION_TRANSACTED); + } + + // AMQConnectionListener Interface.. used so we can validate that we + // actually failed over. + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + //Allow failover + return true; + } + + public boolean preResubscribe() + { + //Allow failover + return true; + } + + public void failoverComplete() + { + _failoverCompleted.countDown(); + } + + /** + * Override so we can block until failover has completd + * + * @param port + */ + @Override + public void failBroker(int port) + { + super.failBroker(port); + + try + { + if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS)) + { + fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME); + } + } + catch (InterruptedException e) + { + fail("Failover was interuppted"); + } + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java new file mode 100644 index 0000000000..4254727d36 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java @@ -0,0 +1,170 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.JMSAMQException; +import org.apache.qpid.client.failover.FailoverException; + +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener +{ + protected CountDownLatch _receviedAll; + protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null); + + @Override + public void setUp() throws Exception + { + super.setUp(); + } + + @Override + public void init(boolean transacted, int mode) throws Exception + { + _receviedAll = new CountDownLatch(NUM_MESSAGES); + + super.init(transacted, mode); + _consumer.setMessageListener(this); + } + + /** + * @param transacted + * @param mode + * + * @throws Exception + */ + protected void testAcking(boolean transacted, int mode) throws Exception + { + init(transacted, mode); + + _connection.start(); + + int lastCount = (int) _receviedAll.getCount(); + + boolean complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS); + + while (!complete) + { + int currentCount = (int) _receviedAll.getCount(); + + // make sure we have received a message in the last cycle. + if (lastCount == currentCount) + { + break; + } + // Remember the currentCount as the lastCount for the next cycle. + // so we can exit if things get locked up. + lastCount = currentCount; + + complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS); + } + + if (!complete) + { + // Check to see if we ended due to an exception in the onMessage handler + Exception cause = _causeOfFailure.get(); + if (cause != null) + { + cause.printStackTrace(); + fail(cause.getMessage()); + } + else + { + fail("All messages not received missing:" + _receviedAll.getCount() + "/" + NUM_MESSAGES); + } + } + + // Check to see if we ended due to an exception in the onMessage handler + Exception cause = _causeOfFailure.get(); + if (cause != null) + { + cause.printStackTrace(); + fail(cause.getMessage()); + } + + try + { + _consumer.close(); + } + catch (JMSAMQException amqe) + { + if (amqe.getLinkedException() instanceof FailoverException) + { + fail("QPID-143 : Auto Ack can acknowledge message from previous session after failver. If failover occurs between deliver and ack."); + } + // else Rethrow for TestCase to catch. + throw amqe; + } + + _consumerSession.close(); + + assertEquals("Wrong number of messages on queue", 0, + ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue)); + } + + public void onMessage(Message message) + { + try + { + int count = NUM_MESSAGES - (int) _receviedAll.getCount(); + + assertEquals("Incorrect message received", count, message.getIntProperty(INDEX)); + + count++; + if (count < NUM_MESSAGES) + { + //Send the next message + _producer.send(createNextMessage(_consumerSession, count)); + } + + doAcknowlegement(message); + + _receviedAll.countDown(); + } + catch (Exception e) + { + // This will end the test run by counting down _receviedAll + fail(e); + } + } + + /** + * Pass the given exception back to the waiting thread to fail the test run. + * + * @param e The exception that is causing the test to fail. + */ + protected void fail(Exception e) + { + _causeOfFailure.set(e); + // End the test. + while (_receviedAll.getCount() != 0) + { + _receviedAll.countDown(); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java index c367a0856c..7c9a77eb53 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java @@ -1,7 +1,5 @@ -package org.apache.qpid.test.unit.ack; - /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,133 +19,138 @@ package org.apache.qpid.test.unit.ack; * */ +package org.apache.qpid.test.unit.ack; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.FailoverBaseCase; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.MessageProducer; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.test.utils.QpidTestCase; - -public class AcknowledgeTest extends QpidTestCase +public class AcknowledgeTest extends FailoverBaseCase { - protected static int NUM_MESSAGES = 100; - protected Connection _con; + protected int NUM_MESSAGES; + protected Connection _connection; protected Queue _queue; - private MessageProducer _producer; - private Session _producerSession; - private Session _consumerSession; - private MessageConsumer _consumerA; + protected Session _consumerSession; + protected MessageConsumer _consumer; + protected MessageProducer _producer; @Override protected void setUp() throws Exception { super.setUp(); - _queue = (Queue) getInitialContext().lookup("queue"); + NUM_MESSAGES = 5; + + _queue = getTestQueue(); //Create Producer put some messages on the queue - _con = getConnection(); - _con.start(); + _connection = getConnection(); } - private void init(boolean transacted, int mode) throws JMSException { - _producerSession = _con.createSession(true, Session.AUTO_ACKNOWLEDGE); - _consumerSession = _con.createSession(transacted, mode); - _producer = _producerSession.createProducer(_queue); - _consumerA = _consumerSession.createConsumer(_queue); - } + protected void init(boolean transacted, int mode) throws Exception + { + _consumerSession = _connection.createSession(transacted, mode); + _consumer = _consumerSession.createConsumer(_queue); + _producer = _consumerSession.createProducer(_queue); + + // These should all end up being prefetched by session + sendMessage(_consumerSession, _queue, 1); + + assertEquals("Wrong number of messages on queue", 1, + ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); + } /** - * Produces and consumes messages an either ack or commit the receipt of those messages - * * @param transacted * @param mode + * * @throws Exception */ - private void testMessageAck(boolean transacted, int mode) throws Exception + protected void testAcking(boolean transacted, int mode) throws Exception { - init(transacted, mode); - sendMessage(_producerSession, _queue, NUM_MESSAGES/2); - _producerSession.commit(); - MessageConsumer consumerB = _consumerSession.createConsumer(_queue); - sendMessage(_producerSession, _queue, NUM_MESSAGES/2); - _producerSession.commit(); + init(transacted, mode); + + _connection.start(); + + Message msg = _consumer.receive(1500); + int count = 0; - Message msg = consumerB.receive(1500); - while (msg != null) + while (count < NUM_MESSAGES) { - if (mode == Session.CLIENT_ACKNOWLEDGE) + assertNotNull("Message " + count + " not correctly received.", msg); + assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX)); + count++; + + if (count < NUM_MESSAGES) { - msg.acknowledge(); + //Send the next message + _producer.send(createNextMessage(_consumerSession, count)); } - count++; - msg = consumerB.receive(1500); + + doAcknowlegement(msg); + + msg = _consumer.receive(1500); } - if (transacted) - { - _consumerSession.commit(); - } - _consumerA.close(); - consumerB.close(); - _consumerSession.close(); - assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count, - ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue)); - - // Clean up messages that may be left on the queue - _consumerSession = _con.createSession(transacted, mode); - _consumerA = _consumerSession.createConsumer(_queue); - msg = _consumerA.receive(1500); - while (msg != null) + + assertEquals("Wrong number of messages on queue", 0, + ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); + } + + /** + * Perform the acknowledgement of messages if additionally required. + * + * @param msg + * + * @throws JMSException + */ + protected void doAcknowlegement(Message msg) throws JMSException + { + if (_consumerSession.getTransacted()) { - if (mode == Session.CLIENT_ACKNOWLEDGE) - { - msg.acknowledge(); - } - msg = _consumerA.receive(1500); + _consumerSession.commit(); } - _consumerA.close(); - if (transacted) + + if (_consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { - _consumerSession.commit(); + msg.acknowledge(); } - _consumerSession.close(); } - - public void test2ConsumersAutoAck() throws Exception + + public void testClientAck() throws Exception { - testMessageAck(false, Session.AUTO_ACKNOWLEDGE); + testAcking(false, Session.CLIENT_ACKNOWLEDGE); } - public void test2ConsumersClientAck() throws Exception + public void testAutoAck() throws Exception { - testMessageAck(true, Session.CLIENT_ACKNOWLEDGE); + testAcking(false, Session.AUTO_ACKNOWLEDGE); } - - public void test2ConsumersTx() throws Exception + + public void testTransacted() throws Exception { - testMessageAck(true, Session.AUTO_ACKNOWLEDGE); + testAcking(true, Session.SESSION_TRANSACTED); } - - public void testIndividualAck() throws Exception + + public void testDupsOk() throws Exception { - init(false, Session.CLIENT_ACKNOWLEDGE); - sendMessage(_producerSession, _queue, 3); - _producerSession.commit(); - Message msg = null; - for (int i = 0; i < 2; i++) - { - msg = _consumerA.receive(RECEIVE_TIMEOUT); - ((AbstractJMSMessage)msg).acknowledgeThis(); - } - msg = _consumerA.receive(RECEIVE_TIMEOUT); - msg.acknowledge(); - _con.close(); + testAcking(false, Session.DUPS_OK_ACKNOWLEDGE); } - + + public void testNoAck() throws Exception + { + testAcking(false, AMQSession.NO_ACKNOWLEDGE); + } + + public void testPreAck() throws Exception + { + testAcking(false, AMQSession.PRE_ACKNOWLEDGE); + } + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java new file mode 100644 index 0000000000..834b17430b --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import org.apache.qpid.jms.Session; + +import javax.jms.Message; +import javax.jms.Queue; + +public class FailoverBeforeConsumingRecoverTest extends RecoverTest +{ + + @Override + protected void initTest() throws Exception + { + super.initTest(); + failBroker(getFailingPort()); + + Queue queue = _consumerSession.createQueue(getTestQueueName()); + sendMessage(_connection.createSession(false, Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java new file mode 100644 index 0000000000..6c4b7ba01b --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.test.utils.QpidTestCase; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * This is a quick manual test to validate acking after failover with a + * transacted session. + * + * Start an external broker then run this test. Std Err will print. + * Sent Message: 1 + * Received Message: 1 + * + * You can then restart the external broker, which will cause failover, which + * will be complete when the following appears. + * + * Failover Complete + * + * A second message send/receive cycle is then done to validate that the + * connection/session are still working. + * + */ +public class QuickAcking extends QpidTestCase implements ConnectionListener +{ + protected AMQConnection _connection; + protected Queue _queue; + protected Session _session; + protected MessageConsumer _consumer; + private CountDownLatch _failedOver; + private static final String INDEX = "INDEX"; + private int _count = 0; + + public void setUp() + { + // Prevent broker startup. Broker must be run manually. + } + + public void test() throws Exception + { + _failedOver = new CountDownLatch(1); + + _connection = new AMQConnection("amqp://guest:guest@client/test?brokerlist='localhost?retries='20'&connectdelay='2000''"); + + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + _queue = _session.createQueue("QAtest"); + _consumer = _session.createConsumer(_queue); + _connection.setConnectionListener(this); + _connection.start(); + + sendAndReceive(); + + _failedOver.await(); + + sendAndReceive(); + + } + + private void sendAndReceive() + throws Exception + { + sendMessage(); + + Message message = _consumer.receive(); + + if (message.getIntProperty(INDEX) != _count) + { + throw new Exception("Incorrect message recieved:" + _count); + } + + if (_session.getTransacted()) + { + _session.commit(); + } + System.err.println("Recevied Message:" + _count); + } + + private void sendMessage() throws JMSException + { + MessageProducer producer = _session.createProducer(_queue); + Message message = _session.createMessage(); + _count++; + message.setIntProperty(INDEX, _count); + + producer.send(message); + if (_session.getTransacted()) + { + _session.commit(); + } + producer.close(); + + System.err.println("Sent Message:" + _count); + } + + public void bytesSent(long count) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void bytesReceived(long count) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + System.err.println("Failover Complete"); + _failedOver.countDown(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 7434fcbb30..4a123cb1dc 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -23,8 +23,7 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Session; -import org.apache.qpid.test.utils.QpidTestCase; - +import org.apache.qpid.test.utils.FailoverBaseCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,16 +34,21 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.TextMessage; - import java.util.concurrent.atomic.AtomicInteger; -public class RecoverTest extends QpidTestCase +public class RecoverTest extends FailoverBaseCase { - private static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class); + static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class); private Exception _error; private AtomicInteger count; + protected AMQConnection _connection; + protected Session _consumerSession; + protected MessageConsumer _consumer; + static final int SENT_COUNT = 4; + + @Override protected void setUp() throws Exception { super.setUp(); @@ -52,134 +56,110 @@ public class RecoverTest extends QpidTestCase count = new AtomicInteger(); } - protected void tearDown() throws Exception + protected void initTest() throws Exception { - super.tearDown(); - count = null; - } + _connection = (AMQConnection) getConnection("guest", "guest"); - public void testRecoverResendsMsgs() throws Exception - { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - - Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), - new AMQShortString("someQ"), false, true); - MessageConsumer consumer = consumerSession.createConsumer(queue); - // force synch to ensure the consumer has resulted in a bound queue - // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - // This is the default now + _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = _consumerSession.createQueue(getTestQueueName()); - AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); + _consumer = _consumerSession.createConsumer(queue); _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); - - con2.close(); - + sendMessage(_connection.createSession(false, Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT); _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - tm.acknowledge(); - _logger.info("Received and acknowledged first message"); - consumer.receive(); - consumer.receive(); - consumer.receive(); - _logger.info("Received all four messages. Calling recover with three outstanding messages"); - // no ack for last three messages so when I call recover I expect to get three messages back - consumerSession.recover(); - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg2", tm.getText()); + _connection.start(); + } - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg3", tm.getText()); + protected Message validateNextMessages(int nextCount, int startIndex) throws JMSException + { + Message message = null; + for (int index = 0; index < nextCount; index++) + { + message = _consumer.receive(3000); + assertEquals(startIndex + index, message.getIntProperty(INDEX)); + } + return message; + } - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg4", tm.getText()); + protected void validateRemainingMessages(int remaining) throws JMSException + { + int index = SENT_COUNT - remaining; - _logger.info("Received redelivery of three messages. Acknowledging last message"); - tm.acknowledge(); + Message message = null; + while (index != SENT_COUNT) + { + message = _consumer.receive(3000); + assertEquals(index++, message.getIntProperty(INDEX)); + } + + if (message != null) + { + _logger.info("Received redelivery of three messages. Acknowledging last message"); + message.acknowledge(); + } _logger.info("Calling acknowledge with no outstanding messages"); // all acked so no messages to be delivered - consumerSession.recover(); + _consumerSession.recover(); - tm = (TextMessage) consumer.receiveNoWait(); - assertNull(tm); + message = _consumer.receiveNoWait(); + assertNull(message); _logger.info("No messages redelivered as is expected"); - - con.close(); } - public void testRecoverResendsMsgsAckOnEarlier() throws Exception + public void testRecoverResendsMsgs() throws Exception { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + initTest(); - Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), - new AMQShortString("someQ"), false, true); - MessageConsumer consumer = consumerSession.createConsumer(queue); - // force synch to ensure the consumer has resulted in a bound queue - // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - // This is the default now + Message message = validateNextMessages(1, 0); + message.acknowledge(); + _logger.info("Received and acknowledged first message"); - AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); + _consumer.receive(); + _consumer.receive(); + _consumer.receive(); + _logger.info("Received all four messages. Calling recover with three outstanding messages"); + // no ack for last three messages so when I call recover I expect to get three messages back - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); + _consumerSession.recover(); - con2.close(); + validateRemainingMessages(3); + } - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - consumer.receive(); - tm.acknowledge(); + public void testRecoverResendsMsgsAckOnEarlier() throws Exception + { + initTest(); + + Message message = validateNextMessages(2, 0); + message.acknowledge(); _logger.info("Received 2 messages, acknowledge() first message, should acknowledge both"); - consumer.receive(); - consumer.receive(); + _consumer.receive(); + _consumer.receive(); _logger.info("Received all four messages. Calling recover with two outstanding messages"); // no ack for last three messages so when I call recover I expect to get three messages back - consumerSession.recover(); - TextMessage tm3 = (TextMessage) consumer.receive(3000); - assertEquals("msg3", tm3.getText()); + _consumerSession.recover(); + + Message message2 = _consumer.receive(3000); + assertEquals(2, message2.getIntProperty(INDEX)); - TextMessage tm4 = (TextMessage) consumer.receive(3000); - assertEquals("msg4", tm4.getText()); + Message message3 = _consumer.receive(3000); + assertEquals(3, message3.getIntProperty(INDEX)); _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message"); - ((org.apache.qpid.jms.Message) tm3).acknowledgeThis(); + ((org.apache.qpid.jms.Message) message2).acknowledgeThis(); _logger.info("Calling recover"); // all acked so no messages to be delivered - consumerSession.recover(); + _consumerSession.recover(); - tm4 = (TextMessage) consumer.receive(3000); - assertEquals("msg4", tm4.getText()); - ((org.apache.qpid.jms.Message) tm4).acknowledgeThis(); + message3 = _consumer.receive(3000); + assertEquals(3, message3.getIntProperty(INDEX)); + ((org.apache.qpid.jms.Message) message3).acknowledgeThis(); - _logger.info("Calling recover"); // all acked so no messages to be delivered - consumerSession.recover(); - - tm = (TextMessage) consumer.receiveNoWait(); - assertNull(tm); - _logger.info("No messages redelivered as is expected"); - - con.close(); + validateRemainingMessages(0); } public void testAcknowledgePerConsumer() throws Exception @@ -188,11 +168,11 @@ public class RecoverTest extends QpidTestCase Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), - false, true); + new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), + false, true); Queue queue2 = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"), - false, true); + new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"), + false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); MessageConsumer consumer2 = consumerSession.createConsumer(queue2); @@ -231,8 +211,8 @@ public class RecoverTest extends QpidTestCase final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"), - false, true); + new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"), + false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); MessageProducer producer = consumerSession.createProducer(queue); producer.send(consumerSession.createTextMessage("hello")); @@ -240,50 +220,50 @@ public class RecoverTest extends QpidTestCase final Object lock = new Object(); consumer.setMessageListener(new MessageListener() - { + { - public void onMessage(Message message) + public void onMessage(Message message) + { + try { - try + count.incrementAndGet(); + if (count.get() == 1) { - count.incrementAndGet(); - if (count.get() == 1) + if (message.getJMSRedelivered()) { - if (message.getJMSRedelivered()) - { - setError( + setError( new Exception("Message marked as redilvered on what should be first delivery attempt")); - } - - consumerSession.recover(); } - else if (count.get() == 2) + + consumerSession.recover(); + } + else if (count.get() == 2) + { + if (!message.getJMSRedelivered()) { - if (!message.getJMSRedelivered()) - { - setError( + setError( new Exception( - "Message not marked as redilvered on what should be second delivery attempt")); - } - } - else - { - System.err.println(message); - fail("Message delivered too many times!: " + count); + "Message not marked as redilvered on what should be second delivery attempt")); } } - catch (JMSException e) + else { - _logger.error("Error recovering session: " + e, e); - setError(e); + System.err.println(message); + fail("Message delivered too many times!: " + count); } + } + catch (JMSException e) + { + _logger.error("Error recovering session: " + e, e); + setError(e); + } - synchronized (lock) - { - lock.notify(); - } + synchronized (lock) + { + lock.notify(); } - }); + } + }); con.start(); @@ -323,9 +303,4 @@ public class RecoverTest extends QpidTestCase { _error = e; } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(RecoverTest.class); - } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SelectorTest.java deleted file mode 100644 index c42e4c7582..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/SelectorTest.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.basic; - -import org.apache.qpid.AMQException; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.BasicMessageProducer; -import org.apache.qpid.client.state.StateWaiter; -import org.apache.qpid.url.URLSyntaxException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.concurrent.CountDownLatch; - -public class SelectorTest extends QpidTestCase implements MessageListener -{ - private static final Logger _logger = LoggerFactory.getLogger(SelectorTest.class); - - private AMQConnection _connection; - private AMQDestination _destination; - private AMQSession _session; - private int count; - public String _connectionString = "vm://:1"; - private static final String INVALID_SELECTOR = "Cost LIKE 5"; - CountDownLatch _responseLatch = new CountDownLatch(1); - - protected void setUp() throws Exception - { - super.setUp(); - init((AMQConnection) getConnection("guest", "guest")); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } - - private void init(AMQConnection connection) throws JMSException - { - init(connection, new AMQQueue(connection, randomize("SessionStartTest"), true)); - } - - private void init(AMQConnection connection, AMQDestination destination) throws JMSException - { - _connection = connection; - _destination = destination; - connection.start(); - - String selector = null; - selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'"; - // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; - - _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); - // _session.createConsumer(destination).setMessageListener(this); - _session.createConsumer(destination, selector).setMessageListener(this); - } - - public void test() throws Exception - { - try - { - - init((AMQConnection) getConnection("guest", "guest", randomize("Client"))); - - Message msg = _session.createTextMessage("Message"); - msg.setJMSPriority(1); - msg.setIntProperty("Cost", 2); - msg.setStringProperty("property-with-hyphen", "wibble"); - msg.setJMSType("Special"); - - _logger.info("Sending Message:" + msg); - - ((BasicMessageProducer) _session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT); - _logger.info("Message sent, waiting for response..."); - - _responseLatch.await(); - - if (count > 0) - { - _logger.info("Got message"); - } - - if (count == 0) - { - fail("Did not get message!"); - // throw new RuntimeException("Did not get message!"); - } - } - catch (JMSException e) - { - _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); - if (!(e instanceof InvalidSelectorException)) - { - fail("Wrong exception:" + e.getMessage()); - } - else - { - System.out.println("SUCCESS!!"); - } - } - catch (InterruptedException e) - { - _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage()); - } - catch (URLSyntaxException e) - { - _logger.debug("URL:" + e.getClass().getSimpleName() + ":" + e.getMessage()); - fail("Wrong exception"); - } - catch (AMQException e) - { - _logger.debug("AMQ:" + e.getClass().getSimpleName() + ":" + e.getMessage()); - fail("Wrong exception"); - } - - finally - { - if (_session != null) - { - _session.close(); - } - if (_connection != null) - { - _connection.close(); - } - } - } - - - public void testInvalidSelectors() throws Exception - { - Connection connection = null; - - try - { - connection = getConnection("guest", "guest", randomize("Client")); - _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); - } - catch (JMSException e) - { - fail(e.getMessage()); - } - catch (AMQException e) - { - fail(e.getMessage()); - } - catch (URLSyntaxException e) - { - fail("Error:" + e.getMessage()); - } - - //Try Creating a Browser - try - { - _session.createBrowser(_session.createQueue("Ping"), INVALID_SELECTOR); - } - catch (JMSException e) - { - _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); - if (!(e instanceof InvalidSelectorException)) - { - fail("Wrong exception:" + e.getMessage()); - } - else - { - _logger.debug("SUCCESS!!"); - } - } - - //Try Creating a Consumer - try - { - _session.createConsumer(_session.createQueue("Ping"), INVALID_SELECTOR); - } - catch (JMSException e) - { - _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); - if (!(e instanceof InvalidSelectorException)) - { - fail("Wrong exception:" + e.getMessage()); - } - else - { - _logger.debug("SUCCESS!!"); - } - } - - //Try Creating a Receiever - try - { - _session.createReceiver(_session.createQueue("Ping"), INVALID_SELECTOR); - } - catch (JMSException e) - { - _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); - if (!(e instanceof InvalidSelectorException)) - { - fail("Wrong exception:" + e.getMessage()); - } - else - { - _logger.debug("SUCCESS!!"); - } - } - - finally - { - if (_session != null) - { - try - { - _session.close(); - } - catch (JMSException e) - { - fail("Error cleaning up:" + e.getMessage()); - } - } - if (_connection != null) - { - try - { - _connection.close(); - } - catch (JMSException e) - { - fail("Error cleaning up:" + e.getMessage()); - } - } - } - } - - public void onMessage(Message message) - { - count++; - _logger.info("Got Message:" + message); - _responseLatch.countDown(); - } - - private static String randomize(String in) - { - return in + System.currentTimeMillis(); - } - - public static void main(String[] argv) throws Exception - { - SelectorTest test = new SelectorTest(); - test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0]; - - try - { - while (true) - { - if (test._connectionString.contains("vm://:1")) - { - test.setUp(); - } - test.test(); - - if (test._connectionString.contains("vm://:1")) - { - test.tearDown(); - } - } - } - catch (Exception e) - { - System.err.println(e.getMessage()); - e.printStackTrace(); - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(SelectorTest.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java index 1f90f1e29f..6b69ccab6c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.Destination; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; @@ -105,7 +106,8 @@ public class JMSPropertiesTest extends QpidTestCase assertEquals("JMS Type mismatch", sentMsg.getJMSType(), rm.getJMSType()); assertEquals("JMS Reply To mismatch", sentMsg.getJMSReplyTo(), rm.getJMSReplyTo()); assertTrue("JMSMessageID Does not start ID:", rm.getJMSMessageID().startsWith("ID:")); - + assertEquals("JMS Default priority should be 4",Message.DEFAULT_PRIORITY,rm.getJMSPriority()); + //Validate that the JMSX values are correct assertEquals("JMSXGroupID is not as expected:", JMSXGroupID_VALUE, rm.getStringProperty("JMSXGroupID")); assertEquals("JMSXGroupSeq is not as expected:", JMSXGroupSeq_VALUE, rm.getIntProperty("JMSXGroupSeq")); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java new file mode 100644 index 0000000000..248042d2c4 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java @@ -0,0 +1,403 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.publish; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.test.utils.FailoverBaseCase; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TransactionRolledBackException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * QPID-1816 : Whilst testing Acknoledgement after failover this completes testing + * of the client after failover. When we have a dirty session we should receive + * an error if we attempt to publish. This test ensures that both in the synchronous + * and asynchronous message delivery paths we receive the expected exceptions at + * the expected time. + */ +public class DirtyTrasactedPubilshTest extends FailoverBaseCase implements ConnectionListener +{ + protected CountDownLatch _failoverCompleted = new CountDownLatch(1); + + protected int NUM_MESSAGES; + protected Connection _connection; + protected Queue _queue; + protected Session _consumerSession; + protected MessageConsumer _consumer; + protected MessageProducer _producer; + + private static final String MSG = "MSG"; + private static final String SEND_FROM_ON_MESSAGE_TEXT = "sendFromOnMessage"; + protected CountDownLatch _receviedAll; + protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null); + + @Override + protected void setUp() throws Exception + { + super.setUp(); + NUM_MESSAGES = 10; + + _queue = getTestQueue(); + + //Create Producer put some messages on the queue + _connection = getConnection(); + } + + /** + * Initialise the test variables + * @param transacted is this a transacted test + * @param mode if not trasacted then what ack mode to use + * @throws Exception if there is a setup issue. + */ + protected void init(boolean transacted, int mode) throws Exception + { + _consumerSession = _connection.createSession(transacted, mode); + _consumer = _consumerSession.createConsumer(_queue); + _producer = _consumerSession.createProducer(_queue); + + // These should all end up being prefetched by session + sendMessage(_consumerSession, _queue, 1); + + assertEquals("Wrong number of messages on queue", 1, + ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); + } + + /** + * If a transacted session has failed over whilst it has uncommitted sent + * data then we need to throw a TransactedRolledbackException on commit() + * + * The alternative would be to maintain a replay buffer so that the message + * could be resent. This is not currently implemented + * + * @throws Exception if something goes wrong. + */ + public void testDirtySendingSynchronousTransacted() throws Exception + { + Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); + + // Ensure we get failover notifications + ((AMQConnection) _connection).setConnectionListener(this); + + MessageProducer producer = producerSession.createProducer(_queue); + + // Create and send message 0 + Message msg = producerSession.createMessage(); + msg.setIntProperty(INDEX, 0); + producer.send(msg); + + // DON'T commit message .. fail connection + + failBroker(getFailingPort()); + + // Ensure destination exists for sending + producerSession.createConsumer(_queue).close(); + + // Send the next message + msg.setIntProperty(INDEX, 1); + try + { + producer.send(msg); + fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException."); + } + catch (JMSException jmse) + { + assertEquals("Early warning of dirty session not correct", + "Failover has occurred and session is dirty so unable to send.", jmse.getMessage()); + } + + // Ignore that the session is dirty and attempt to commit to validate the + // exception is thrown. AND that the above failure notification did NOT + // clean up the session. + + try + { + producerSession.commit(); + fail("Session is dirty we should get an TransactionRolledBackException"); + } + catch (TransactionRolledBackException trbe) + { + // Normal path. + } + + // Resending of messages should now work ok as the commit was forcilbly rolledback + msg.setIntProperty(INDEX, 0); + producer.send(msg); + msg.setIntProperty(INDEX, 1); + producer.send(msg); + + producerSession.commit(); + + assertEquals("Wrong number of messages on queue", 2, + ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue)); + } + + /** + * If a transacted session has failed over whilst it has uncommitted sent + * data then we need to throw a TransactedRolledbackException on commit() + * + * The alternative would be to maintain a replay buffer so that the message + * could be resent. This is not currently implemented + * + * @throws Exception if something goes wrong. + */ + public void testDirtySendingOnMessageTransacted() throws Exception + { + NUM_MESSAGES = 1; + _receviedAll = new CountDownLatch(NUM_MESSAGES); + ((AMQConnection) _connection).setConnectionListener(this); + + init(true, Session.SESSION_TRANSACTED); + + _consumer.setMessageListener(new MessageListener() + { + + public void onMessage(Message message) + { + try + { + // Create and send message 0 + Message msg = _consumerSession.createMessage(); + msg.setIntProperty(INDEX, 0); + _producer.send(msg); + + // DON'T commit message .. fail connection + + failBroker(getFailingPort()); + + // rep + repopulateBroker(); + + // Destination will exist as this failBroker will populate + // the queue with 1 message + + // Send the next message + msg.setIntProperty(INDEX, 1); + try + { + _producer.send(msg); + fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException."); + } + catch (JMSException jmse) + { + assertEquals("Early warning of dirty session not correct", + "Failover has occurred and session is dirty so unable to send.", jmse.getMessage()); + } + + // Ignore that the session is dirty and attempt to commit to validate the + // exception is thrown. AND that the above failure notification did NOT + // clean up the session. + + try + { + _consumerSession.commit(); + fail("Session is dirty we should get an TransactionRolledBackException"); + } + catch (TransactionRolledBackException trbe) + { + // Normal path. + } + + // Resend messages + msg.setIntProperty(INDEX, 0); + msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT); + _producer.send(msg); + msg.setIntProperty(INDEX, 1); + msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT); + _producer.send(msg); + + _consumerSession.commit(); + + // Stop this consumer .. can't do _consumer.stop == DEADLOCK + // this doesn't seem to stop dispatcher running + _connection.stop(); + + // Signal that the onMessage send part of test is complete + // main thread can validate that messages are correct + _receviedAll.countDown(); + + } + catch (Exception e) + { + fail(e); + } + + } + + }); + + _connection.start(); + + if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS)) + { + // Check to see if we ended due to an exception in the onMessage handler + Exception cause = _causeOfFailure.get(); + if (cause != null) + { + cause.printStackTrace(); + fail(cause.getMessage()); + } + else + { + fail("All messages not received:" + _receviedAll.getCount() + "/" + NUM_MESSAGES); + } + } + + // Check to see if we ended due to an exception in the onMessage handler + Exception cause = _causeOfFailure.get(); + if (cause != null) + { + cause.printStackTrace(); + fail(cause.getMessage()); + } + + _consumer.close(); + _consumerSession.close(); + + _consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _connection.start(); + + // Validate that we could send the messages as expected. + assertEquals("Wrong number of messages on queue", 3, + ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); + + MessageConsumer consumer = _consumerSession.createConsumer(_queue); + + //Validate the message sent to setup the failed over broker. + Message message = consumer.receive(1000); + assertNotNull("Message " + 0 + " not received.", message); + assertEquals("Incorrect message received", 0, message.getIntProperty(INDEX)); + + // Validate the two messages sent from within the onMessage + for (int index = 0; index <= 1; index++) + { + message = consumer.receive(1000); + assertNotNull("Message " + index + " not received.", message); + assertEquals("Incorrect message received", index, message.getIntProperty(INDEX)); + assertEquals("Incorrect message text for message:" + index, SEND_FROM_ON_MESSAGE_TEXT, message.getStringProperty(MSG)); + } + + assertNull("Extra message received.", consumer.receiveNoWait()); + + _consumerSession.close(); + + assertEquals("Wrong number of messages on queue", 0, + ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue)); + } + + private void repopulateBroker() throws Exception + { + // Repopulate this new broker so we can test what happends after failover + + //Get the connection to the first (main port) broker. + Connection connection = getConnection(); + // Use a transaction to send messages so we can be sure they arrive. + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + // ensure destination is created. + session.createConsumer(_queue).close(); + + sendMessage(session, _queue, NUM_MESSAGES); + + assertEquals("Wrong number of messages on queue", NUM_MESSAGES, + ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); + + connection.close(); + } + + // AMQConnectionListener Interface.. used so we can validate that we + // actually failed over. + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + //Allow failover + return true; + } + + public boolean preResubscribe() + { + //Allow failover + return true; + } + + public void failoverComplete() + { + _failoverCompleted.countDown(); + } + + /** + * Override so we can block until failover has completd + * + * @param port int the port of the broker to fail. + */ + @Override + public void failBroker(int port) + { + super.failBroker(port); + + try + { + if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS)) + { + fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME); + } + } + catch (InterruptedException e) + { + fail("Failover was interuppted"); + } + } + + /** + * Pass the given exception back to the waiting thread to fail the test run. + * + * @param e The exception that is causing the test to fail. + */ + protected void fail(Exception e) + { + _causeOfFailure.set(e); + // End the test. + while (_receviedAll.getCount() != 0) + { + _receviedAll.countDown(); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java index 64bd1503ba..0426c4f45f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java @@ -20,44 +20,43 @@ */ package org.apache.qpid.test.utils; -import javax.jms.Connection; -import javax.jms.JMSException; +import org.apache.qpid.util.FileUtils; + import javax.naming.NamingException; -import org.apache.qpid.util.FileUtils; +import org.apache.qpid.client.AMQConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FailoverBaseCase extends QpidTestCase { + protected static final Logger _logger = LoggerFactory.getLogger(FailoverBaseCase.class); public static int FAILING_VM_PORT = 2; - public static int FAILING_PORT = DEFAULT_PORT + 100; + public static int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt")); + public static final long DEFAULT_FAILOVER_TIME = 10000L; protected int failingPort; - - private boolean failedOver = false; - public FailoverBaseCase() + protected int getFailingPort() { if (_broker.equals(VM)) { - failingPort = FAILING_VM_PORT; + return FAILING_VM_PORT; } else { - failingPort = FAILING_PORT; + return FAILING_PORT; } } - - protected int getFailingPort() - { - return failingPort; - } protected void setUp() throws java.lang.Exception { super.setUp(); - setSystemProperty("QPID_WORK", System.getProperty("java.io.tmpdir")+"/"+getFailingPort()); - startBroker(failingPort); + // Set QPID_WORK to $QPID_WORK/<getFailingPort()> + // or /tmp/<getFailingPort()> if QPID_WORK not set. + setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK") + "/" + getFailingPort()); + startBroker(getFailingPort()); } /** @@ -66,42 +65,52 @@ public class FailoverBaseCase extends QpidTestCase * @return a connection * @throws Exception */ - public Connection getConnection() throws JMSException, NamingException + @Override + public AMQConnectionFactory getConnectionFactory() throws NamingException { - Connection conn = - (Boolean.getBoolean("profile.use_ssl"))? - getConnectionFactory("failover.ssl").createConnection("guest", "guest"): - getConnectionFactory("failover").createConnection("guest", "guest"); - _connections.add(conn); - return conn; + _logger.info("get ConnectionFactory"); + if (_connectionFactory == null) + { + if (Boolean.getBoolean("profile.use_ssl")) + { + _connectionFactory = getConnectionFactory("failover.ssl"); + } + else + { + _connectionFactory = getConnectionFactory("failover"); + } + } + return _connectionFactory; } + public void tearDown() throws Exception { - stopBroker(_broker.equals(VM)?FAILING_PORT:FAILING_PORT); - super.tearDown(); - FileUtils.deleteDirectory(System.getProperty("java.io.tmpdir")+"/"+getFailingPort()); + try + { + super.tearDown(); + } + finally + { + // Ensure we shutdown any secondary brokers, even if we are unable + // to cleanly tearDown the QTC. + stopBroker(getFailingPort()); + FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort()); + } } - /** - * Only used of VM borker. - */ - public void failBroker() + public void failBroker(int port) { - failedOver = true; try { - stopBroker(getFailingPort()); + stopBroker(port); } catch (Exception e) { throw new RuntimeException(e); } } - - protected void setFailingPort(int p) - { - failingPort = p; - } + + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java index 76e4118c96..6c1b1c7b8d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -22,8 +22,10 @@ import junit.framework.TestResult; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; @@ -32,6 +34,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.store.DerbyMessageStore; import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Level; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,13 +73,18 @@ public class QpidTestCase extends TestCase protected final String QpidHome = System.getProperty("QPID_HOME"); protected File _configFile = new File(System.getProperty("broker.config")); - private static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class); + protected static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class); protected long RECEIVE_TIMEOUT = 1000l; - private Map<String, String> _setProperties = new HashMap<String, String>(); + private Map<String, String> _propertiesSetForTestOnly = new HashMap<String, String>(); + private Map<String, String> _propertiesSetForBroker = new HashMap<String, String>(); + private Map<org.apache.log4j.Logger, Level> _loggerLevelSetForTest = new HashMap<org.apache.log4j.Logger, Level>(); + private XMLConfiguration _testConfiguration = new XMLConfiguration(); + protected static final String INDEX = "index"; + /** * Some tests are excluded when the property test.excludes is set to true. * An exclusion list is either a file (prop test.excludesfile) which contains one test name @@ -147,6 +155,7 @@ public class QpidTestCase extends TestCase private static final String BROKER_LANGUAGE = "broker.language"; private static final String BROKER = "broker"; private static final String BROKER_CLEAN = "broker.clean"; + private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests"; private static final String BROKER_VERSION = "broker.version"; protected static final String BROKER_READY = "broker.ready"; private static final String BROKER_STOPPED = "broker.stopped"; @@ -169,6 +178,7 @@ public class QpidTestCase extends TestCase protected String _brokerLanguage = System.getProperty(BROKER_LANGUAGE, JAVA); protected String _broker = System.getProperty(BROKER, VM); private String _brokerClean = System.getProperty(BROKER_CLEAN, null); + private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS); private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08); private String _output = System.getProperty(TEST_OUTPUT); @@ -177,7 +187,7 @@ public class QpidTestCase extends TestCase private Map<Integer, Process> _brokers = new HashMap<Integer, Process>(); private InitialContext _initialContext; - private AMQConnectionFactory _connectionFactory; + protected AMQConnectionFactory _connectionFactory; private String _testName; @@ -235,6 +245,19 @@ public class QpidTestCase extends TestCase { _logger.error("exception stopping broker", e); } + + if(_brokerCleanBetweenTests) + { + try + { + cleanBroker(); + } + catch (Exception e) + { + _logger.error("exception cleaning up broker", e); + } + } + _logger.info("========== stop " + _testName + " =========="); if (redirected) @@ -451,6 +474,15 @@ public class QpidTestCase extends TestCase env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + _testName + "\""); env.put("QPID_WORK", System.getProperty("QPID_WORK")); + + // Use the environment variable to set amqj.logging.level for the broker + // The value used is a 'server' value in the test configuration to + // allow a differentiation between the client and broker logging levels. + if (System.getProperty("amqj.server.logging.level") != null) + { + setBrokerEnvironment("AMQJ_LOGGING_LEVEL", System.getProperty("amqj.server.logging.level")); + } + // Add all the environment settings the test requested if (!_env.isEmpty()) { @@ -460,13 +492,27 @@ public class QpidTestCase extends TestCase } } + + // Add default test logging levels that are used by the log4j-test + // Use the convenience methods to push the current logging setting + // in to the external broker's QPID_OPTS string. + if (System.getProperty("amqj.protocol.logging.level") != null) + { + setSystemProperty("amqj.protocol.logging.level"); + } + if (System.getProperty("root.logging.level") != null) + { + setSystemProperty("root.logging.level"); + } + + String QPID_OPTS = " "; // Add all the specified system properties to QPID_OPTS - if (!_setProperties.isEmpty()) + if (!_propertiesSetForBroker.isEmpty()) { - for (String key : _setProperties.keySet()) + for (String key : _propertiesSetForBroker.keySet()) { - QPID_OPTS += "-D" + key + "=" + System.getProperty(key) + " "; + QPID_OPTS += "-D" + key + "=" + _propertiesSetForBroker.get(key) + " "; } if (env.containsKey("QPID_OPTS")) @@ -489,7 +535,7 @@ public class QpidTestCase extends TestCase if (!p.await(30, TimeUnit.SECONDS)) { - _logger.info("broker failed to become ready:" + p.getStopLine()); + _logger.info("broker failed to become ready (" + p.ready + "):" + p.getStopLine()); //Ensure broker has stopped process.destroy(); cleanBroker(); @@ -667,28 +713,87 @@ public class QpidTestCase extends TestCase } /** + * Set a System property that is to be applied only to the external test + * broker. + * + * This is a convenience method to enable the setting of a -Dproperty=value + * entry in QPID_OPTS + * + * This is only useful for the External Java Broker tests. + * + * @param property the property name + * @param value the value to set the property to + */ + protected void setBrokerOnlySystemProperty(String property, String value) + { + if (!_propertiesSetForBroker.containsKey(property)) + { + _propertiesSetForBroker.put(property, value); + } + + } + + /** + * Set a System (-D) property for this test run. + * + * This convenience method copies the current VMs System Property + * for the external VM Broker. + * + * @param property the System property to set + */ + protected void setSystemProperty(String property) + { + setSystemProperty(property, System.getProperty(property)); + } + + /** * Set a System property for the duration of this test. * * When the test run is complete the value will be reverted. * + * The values set using this method will also be propogated to the external + * Java Broker via a -D value defined in QPID_OPTS. + * + * If the value should not be set on the broker then use + * setTestClientSystemProperty(). + * * @param property the property to set * @param value the new value to use */ protected void setSystemProperty(String property, String value) { - if (!_setProperties.containsKey(property)) + // Record the value for the external broker + _propertiesSetForBroker.put(property, value); + + //Set the value for the test client vm aswell. + setTestClientSystemProperty(property, value); + } + + /** + * Set a System (-D) property for the external Broker of this test. + * + * @param property The property to set + * @param value the value to set it to. + */ + protected void setTestClientSystemProperty(String property, String value) + { + if (!_propertiesSetForTestOnly.containsKey(property)) { - _setProperties.put(property, System.getProperty(property)); - } + // Record the current value so we can revert it later. + _propertiesSetForTestOnly.put(property, System.getProperty(property)); + } System.setProperty(property, value); } + /** + * Restore the System property values that were set before this test run. + */ protected void revertSystemProperties() { - for (String key : _setProperties.keySet()) + for (String key : _propertiesSetForTestOnly.keySet()) { - String value = _setProperties.get(key); + String value = _propertiesSetForTestOnly.get(key); if (value != null) { System.setProperty(key, value); @@ -698,6 +803,12 @@ public class QpidTestCase extends TestCase System.clearProperty(key); } } + + _propertiesSetForTestOnly.clear(); + + // We don't change the current VMs settings for Broker only properties + // so we can just clear this map + _propertiesSetForBroker.clear(); } /** @@ -712,6 +823,40 @@ public class QpidTestCase extends TestCase } /** + * Adjust the VMs Log4j Settings just for this test run + * + * @param logger the logger to change + * @param level the level to set + */ + protected void setLoggerLevel(org.apache.log4j.Logger logger, Level level) + { + assertNotNull("Cannot set level of null logger", logger); + assertNotNull("Cannot set Logger("+logger.getName()+") to null level.",level); + + if (!_loggerLevelSetForTest.containsKey(logger)) + { + // Record the current value so we can revert it later. + _loggerLevelSetForTest.put(logger, logger.getLevel()); + } + + logger.setLevel(level); + } + + /** + * Restore the logging levels defined by this test. + */ + protected void revertLoggingLevels() + { + for (org.apache.log4j.Logger logger : _loggerLevelSetForTest.keySet()) + { + logger.setLevel(_loggerLevelSetForTest.get(logger)); + } + + _loggerLevelSetForTest.clear(); + + } + + /** * Check whether the broker is an 0.8 * * @return true if the broker is an 0_8 version, false otherwise. @@ -786,7 +931,7 @@ public class QpidTestCase extends TestCase { if (Boolean.getBoolean("profile.use_ssl")) { - _connectionFactory = getConnectionFactory("ssl"); + _connectionFactory = getConnectionFactory("default.ssl"); } else { @@ -876,15 +1021,32 @@ public class QpidTestCase extends TestCase return getClass().getSimpleName() + "-" + getName(); } + /** + * Return a Queue specific for this test. + * Uses getTestQueueName() as the name of the queue + * @return + */ + public Queue getTestQueue() + { + return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName()); + } + + protected void tearDown() throws java.lang.Exception { - // close all the connections used by this test. - for (Connection c : _connections) + try { - c.close(); + // close all the connections used by this test. + for (Connection c : _connections) + { + c.close(); + } + } + finally{ + // Ensure any problems with close does not interfer with property resets + revertSystemProperties(); + revertLoggingLevels(); } - - revertSystemProperties(); } /** @@ -921,17 +1083,23 @@ public class QpidTestCase extends TestCase public List<Message> sendMessage(Session session, Destination destination, int count) throws Exception { - return sendMessage(session, destination, count, 0); + return sendMessage(session, destination, count, 0, 0); } public List<Message> sendMessage(Session session, Destination destination, int count, int batchSize) throws Exception { + return sendMessage(session, destination, count, 0, batchSize); + } + + public List<Message> sendMessage(Session session, Destination destination, + int count, int offset, int batchSize) throws Exception + { List<Message> messages = new ArrayList<Message>(count); MessageProducer producer = session.createProducer(destination); - for (int i = 0; i < count; i++) + for (int i = offset; i < (count + offset); i++) { Message next = createNextMessage(session, i); @@ -950,8 +1118,11 @@ public class QpidTestCase extends TestCase } // Ensure we commit the last messages - if (session.getTransacted() && (batchSize > 0) && - (count / batchSize != 0)) + // Commit the session if we are transacted and + // we have no batchSize or + // our count is not divible by batchSize. + if (session.getTransacted() && + ( batchSize == 0 || count % batchSize != 0)) { session.commit(); } @@ -961,7 +1132,11 @@ public class QpidTestCase extends TestCase public Message createNextMessage(Session session, int msgCount) throws JMSException { - return session.createMessage(); + Message message = session.createMessage(); + message.setIntProperty(INDEX, msgCount); + + return message; + } public ConnectionURL getConnectionURL() throws NamingException |