summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-12-20 20:12:25 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-12-20 20:12:25 +0000
commit13e1148321e73cf06d5b1037c0ee035431ff5554 (patch)
tree90ca164558496b0c9e4c9a69c246832a09b9cb55
parent4c55269f3725c5a29499f3615da401ad39092394 (diff)
downloadqpid-python-13e1148321e73cf06d5b1037c0ee035431ff5554.tar.gz
QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers
Ack each individual message on commit, not use multiple acks git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@606016 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java151
1 files changed, 151 insertions, 0 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
new file mode 100644
index 0000000000..f94058f243
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
@@ -0,0 +1,151 @@
+package org.apache.qpid.test.unit.ack;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.test.VMTestCase;
+
+public class AcknowledgeTest extends VMTestCase
+{
+ private static final int NUM_MESSAGES = 50;
+ private Connection _con;
+ private Queue _queue;
+ private MessageProducer _producer;
+ private Session _producerSession;
+ private Session _consumerSession;
+ private MessageConsumer _consumerA;
+ private MessageConsumer _consumerB;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _queue = (Queue) _context.lookup("queue");
+
+ //CreateQueue
+ ((ConnectionFactory) _context.lookup("connection")).createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(_queue).close();
+
+ //Create Producer put some messages on the queue
+ _con = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+ _con.start();
+ }
+
+ private void init(boolean transacted, int mode) throws JMSException {
+ _producerSession = _con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerSession = _con.createSession(transacted, mode);
+ _producer = _producerSession.createProducer(_queue);
+ _consumerA = _consumerSession.createConsumer(_queue);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ try
+ {
+ TransportConnection.killAllVMBrokers();
+ //ApplicationRegistry.removeAll();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to clean up");
+ }
+
+ }
+
+ private void consumeMessages(int toConsume, MessageConsumer consumer) throws JMSException
+ {
+ Message msg;
+ for (int i = 0; i < toConsume; i++)
+ {
+ msg = consumer.receive(1000);
+ assertNotNull("Message " + i + " was null!", msg);
+ assertEquals("message " + i, ((TextMessage) msg).getText());
+ }
+ }
+
+ private void sendMessages(int totalMessages) throws JMSException
+ {
+ for (int i = 0; i < totalMessages; i++)
+ {
+ _producer.send(_producerSession.createTextMessage("message " + i));
+ }
+ }
+
+ private void testMessageAck(boolean transacted, int mode) throws Exception
+ {
+ init(transacted, mode);
+ sendMessages(NUM_MESSAGES/2);
+ Thread.sleep(1500);
+ _consumerB = _consumerSession.createConsumer(_queue);
+ sendMessages(NUM_MESSAGES/2);
+ int count = 0;
+ Message msg = _consumerB.receive(100);
+ while (msg != null)
+ {
+ if (mode == Session.CLIENT_ACKNOWLEDGE)
+ {
+ msg.acknowledge();
+ }
+ count++;
+ msg = _consumerB.receive(1500);
+ }
+ if (transacted)
+ {
+ _consumerSession.commit();
+ }
+ _consumerA.close();
+ _consumerB.close();
+ _consumerSession.close();
+ assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count, getMessageCount(_queue.getQueueName()));
+ }
+
+ public void test2ConsumersAutoAck() throws Exception
+ {
+ testMessageAck(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void test2ConsumersClientAck() throws Exception
+ {
+ testMessageAck(true, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void test2ConsumersTx() throws Exception
+ {
+ testMessageAck(true, Session.AUTO_ACKNOWLEDGE);
+ }
+
+}