diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java | 148 |
1 files changed, 148 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java new file mode 100644 index 0000000000..6c83136511 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * This is a quick manual test to validate acking after failover with a + * transacted session. + * + * Start an external broker then run this test. Std Err will print. + * Sent Message: 1 + * Received Message: 1 + * + * You can then restart the external broker, which will cause failover, which + * will be complete when the following appears. + * + * Failover Complete + * + * A second message send/receive cycle is then done to validate that the + * connection/session are still working. + * + */ +public class QuickAcking extends QpidBrokerTestCase implements ConnectionListener +{ + protected AMQConnection _connection; + protected Queue _queue; + protected Session _session; + protected MessageConsumer _consumer; + private CountDownLatch _failedOver; + private static final String INDEX = "INDEX"; + private int _count = 0; + + public void setUp() + { + // Prevent broker startup. Broker must be run manually. + } + + public void test() throws Exception + { + _failedOver = new CountDownLatch(1); + + _connection = new AMQConnection("amqp://guest:guest@client/test?brokerlist='localhost?retries='20'&connectdelay='2000''"); + + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + _queue = _session.createQueue("QAtest"); + _consumer = _session.createConsumer(_queue); + _connection.setConnectionListener(this); + _connection.start(); + + sendAndReceive(); + + _failedOver.await(); + + sendAndReceive(); + + } + + private void sendAndReceive() + throws Exception + { + sendMessage(); + + Message message = _consumer.receive(); + + if (message.getIntProperty(INDEX) != _count) + { + throw new Exception("Incorrect message recieved:" + _count); + } + + if (_session.getTransacted()) + { + _session.commit(); + } + System.err.println("Recevied Message:" + _count); + } + + private void sendMessage() throws JMSException + { + MessageProducer producer = _session.createProducer(_queue); + Message message = _session.createMessage(); + _count++; + message.setIntProperty(INDEX, _count); + + producer.send(message); + if (_session.getTransacted()) + { + _session.commit(); + } + producer.close(); + + System.err.println("Sent Message:" + _count); + } + + public void bytesSent(long count) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void bytesReceived(long count) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + System.err.println("Failover Complete"); + _failedOver.countDown(); + } +} |