diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-12-20 20:12:25 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-12-20 20:12:25 +0000 |
commit | 13e1148321e73cf06d5b1037c0ee035431ff5554 (patch) | |
tree | 90ca164558496b0c9e4c9a69c246832a09b9cb55 | |
parent | 4c55269f3725c5a29499f3615da401ad39092394 (diff) | |
download | qpid-python-13e1148321e73cf06d5b1037c0ee035431ff5554.tar.gz |
QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers
Ack each individual message on commit, not use multiple acks
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@606016 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java | 151 |
1 files changed, 151 insertions, 0 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java new file mode 100644 index 0000000000..f94058f243 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java @@ -0,0 +1,151 @@ +package org.apache.qpid.test.unit.ack; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.test.VMTestCase; + +public class AcknowledgeTest extends VMTestCase +{ + private static final int NUM_MESSAGES = 50; + private Connection _con; + private Queue _queue; + private MessageProducer _producer; + private Session _producerSession; + private Session _consumerSession; + private MessageConsumer _consumerA; + private MessageConsumer _consumerB; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + _queue = (Queue) _context.lookup("queue"); + + //CreateQueue + ((ConnectionFactory) _context.lookup("connection")).createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close(); + + //Create Producer put some messages on the queue + _con = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + _con.start(); + } + + private void init(boolean transacted, int mode) throws JMSException { + _producerSession = _con.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerSession = _con.createSession(transacted, mode); + _producer = _producerSession.createProducer(_queue); + _consumerA = _consumerSession.createConsumer(_queue); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + try + { + TransportConnection.killAllVMBrokers(); + //ApplicationRegistry.removeAll(); + } + catch (Exception e) + { + fail("Unable to clean up"); + } + + } + + private void consumeMessages(int toConsume, MessageConsumer consumer) throws JMSException + { + Message msg; + for (int i = 0; i < toConsume; i++) + { + msg = consumer.receive(1000); + assertNotNull("Message " + i + " was null!", msg); + assertEquals("message " + i, ((TextMessage) msg).getText()); + } + } + + private void sendMessages(int totalMessages) throws JMSException + { + for (int i = 0; i < totalMessages; i++) + { + _producer.send(_producerSession.createTextMessage("message " + i)); + } + } + + private void testMessageAck(boolean transacted, int mode) throws Exception + { + init(transacted, mode); + sendMessages(NUM_MESSAGES/2); + Thread.sleep(1500); + _consumerB = _consumerSession.createConsumer(_queue); + sendMessages(NUM_MESSAGES/2); + int count = 0; + Message msg = _consumerB.receive(100); + while (msg != null) + { + if (mode == Session.CLIENT_ACKNOWLEDGE) + { + msg.acknowledge(); + } + count++; + msg = _consumerB.receive(1500); + } + if (transacted) + { + _consumerSession.commit(); + } + _consumerA.close(); + _consumerB.close(); + _consumerSession.close(); + assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count, getMessageCount(_queue.getQueueName())); + } + + public void test2ConsumersAutoAck() throws Exception + { + testMessageAck(false, Session.AUTO_ACKNOWLEDGE); + } + + public void test2ConsumersClientAck() throws Exception + { + testMessageAck(true, Session.CLIENT_ACKNOWLEDGE); + } + + public void test2ConsumersTx() throws Exception + { + testMessageAck(true, Session.AUTO_ACKNOWLEDGE); + } + +} |