summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java142
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java119
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java373
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java69
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/VerifyAckingOkDuringClose.java160
5 files changed, 863 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
new file mode 100644
index 0000000000..039a172e4d
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.close;
+
+import junit.framework.Assert;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.junit.concurrency.TestRunnable;
+import org.apache.qpid.junit.concurrency.ThreadTestCoordinator;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+/**
+ * This test forces the situation where a session is closed whilst a message consumer is still in its onMessage method.
+ * Running in AUTO_ACK mode, the close call ought to wait until the onMessage method completes, and the ack is sent
+ * before closing the connection.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Check that
+ * closing a connection whilst handling a message, blocks till completion of the handler. </table>
+ */
+public class CloseBeforeAckTest extends QpidBrokerTestCase
+{
+ private static final Logger log = LoggerFactory.getLogger(CloseBeforeAckTest.class);
+
+ Connection connection;
+ Session session;
+ public static final String TEST_QUEUE_NAME = "TestQueue";
+ private int TEST_COUNT = 25;
+
+ class TestThread1 extends TestRunnable implements MessageListener
+ {
+ public void runWithExceptions() throws Exception
+ {
+ // Set this up to listen for message on the test session.
+ session.createConsumer(session.createQueue(TEST_QUEUE_NAME)).setMessageListener(this);
+ }
+
+ public void onMessage(Message message)
+ {
+ // Give thread 2 permission to close the session.
+ allow(new int[] { 1 });
+
+ // Wait until thread 2 has closed the connection, or is blocked waiting for this to complete.
+ waitFor(new int[] { 1 }, true);
+ }
+ }
+
+ TestThread1 testThread1 = new TestThread1();
+
+ TestRunnable testThread2 =
+ new TestRunnable()
+ {
+ public void runWithExceptions() throws Exception
+ {
+ // Send a message to be picked up by thread 1.
+ session.createProducer(null).send(session.createQueue(TEST_QUEUE_NAME),
+ session.createTextMessage("Hi there thread 1!"));
+
+ // Wait for thread 1 to pick up the message and give permission to continue.
+ waitFor(new int[] { 0 }, false);
+
+ // Close the connection.
+ session.close();
+
+ // Allow thread 1 to continue to completion, if it is erronously still waiting.
+ allow(new int[] { 1 });
+ }
+ };
+
+ public void testCloseBeforeAutoAck_QPID_397() throws Exception
+ {
+ // Create a session in auto acknowledge mode. This problem shows up in auto acknowledge if the client acks
+ // message at the end of the onMessage method, after a close has been sent.
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ThreadTestCoordinator tt = new ThreadTestCoordinator(2);
+
+ tt.addTestThread(testThread1, 0);
+ tt.addTestThread(testThread2, 1);
+ tt.setDeadlockTimeout(500);
+ tt.run();
+
+ String errorMessage = tt.joinAndRetrieveMessages();
+
+ // Print any error messages or exceptions.
+ log.debug(errorMessage);
+
+ if (!tt.getExceptions().isEmpty())
+ {
+ for (Exception e : tt.getExceptions())
+ {
+ log.debug("Exception thrown during test thread: ", e);
+ }
+ }
+
+ Assert.assertTrue(errorMessage, "".equals(errorMessage));
+ }
+
+ public void closeBeforeAutoAckManyTimes() throws Exception
+ {
+ for (int i = 0; i < TEST_COUNT; i++)
+ {
+ testCloseBeforeAutoAck_QPID_397();
+ }
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ connection = getConnection("guest", "guest");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
new file mode 100644
index 0000000000..6bc6c591ae
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.close;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Session;
+
+/** QPID-1809
+ *
+ * Race condition on error handling and close logic.
+ *
+ * See most often with SimpleACLTest as this test is the expects the server to
+ * shut the connection/channels. This sort of testing is not performed by many,
+ * if any, of the other system tests.
+ *
+ * The problem is that we have two threads
+ *
+ * MainThread Exception(Mina)Thread
+ * | |
+ * Performs |
+ * ACtion |
+ * | Receives Server
+ * | Close
+ * Blocks for |
+ * Response |
+ * | Starts To Notify
+ * | client
+ * | |
+ * | <----- Notify Main Thread
+ * Notification |
+ * wakes client |
+ * | |
+ * Client then |
+ * processes Error. |
+ * | |
+ * Potentially Attempting Close Channel/Connection
+ * Connection Close
+ *
+ * The two threads both attempt to close the connection but the main thread does
+ * so assuming that the connection is open and valid.
+ *
+ * The Exception thread must modify the connection so that no furter syncWait
+ * commands are performed.
+ *
+ * This test sends an ExchangeDeclare that is Asynchronous and will fail and
+ * so cause a ChannelClose error but we perform a syncWait so that we can be
+ * sure to test that the BlockingWaiter is correctly awoken.
+ *
+ */
+public class JavaServerCloseRaceConditionTest extends QpidBrokerTestCase
+{
+ private static final String EXCHANGE_NAME = "NewExchangeNametoFailLookup";
+
+ public void test() throws Exception
+ {
+
+ AMQConnection connection = (AMQConnection) getConnection();
+
+ AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Set no wait true so that we block the connection
+ // Also set a different exchange class string so the attempt to declare
+ // the exchange causes an exchange.
+ ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), new AMQShortString(EXCHANGE_NAME), null,
+ true, false, false, false, true, null);
+
+ AMQFrame exchangeDeclare = body.generateFrame(session.getChannelId());
+
+ try
+ {
+ // block our thread so that can times out
+ connection.getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ }
+ catch (Exception e)
+ {
+ assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME));
+ }
+
+ try
+ {
+ // Depending on if the notification thread has closed the connection
+ // or not we may get an exception here when we attempt to close the
+ // connection. If we do get one then it should be the same as above
+ // an AMQAuthenticationException.
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME));
+ }
+
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
new file mode 100644
index 0000000000..de092fc893
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.close;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.QpidClientConnection;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.url.URLSyntaxException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MessageRequeueTest extends QpidBrokerTestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(MessageRequeueTest.class);
+
+ protected static AtomicInteger consumerIds = new AtomicInteger(0);
+ protected final Integer numTestMessages = 150;
+
+ protected final int consumeTimeout = 3000;
+
+ protected final String queue = "direct://amq.direct//message-requeue-test-queue";
+ protected String payload = "Message:";
+
+ //protected final String BROKER = "vm://:1";
+ protected final String BROKER = "tcp://127.0.0.1:5672";
+ private boolean testReception = true;
+
+ private long[] receieved = new long[numTestMessages + 1];
+ private boolean passed = false;
+ QpidClientConnection conn;
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ conn = new QpidClientConnection(BROKER);
+
+ conn.connect();
+ // clear queue
+ conn.consume(queue, consumeTimeout);
+ // load test data
+ _logger.info("creating test data, " + numTestMessages + " messages");
+ conn.put(queue, payload, numTestMessages);
+ // close this connection
+ conn.disconnect();
+ }
+
+ protected void tearDown() throws Exception
+ {
+
+ if (!passed) // clean up
+ {
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
+
+ conn.connect();
+ // clear queue
+ conn.consume(queue, consumeTimeout);
+
+ conn.disconnect();
+ }
+
+ super.tearDown();
+ }
+
+ /**
+ * multiple consumers
+ *
+ * @throws javax.jms.JMSException if a JMS problem occurs
+ * @throws InterruptedException on timeout
+ */
+ public void testDrain() throws Exception
+ {
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
+
+ conn.connect();
+
+ _logger.info("consuming queue " + queue);
+ Queue q = conn.getSession().createQueue(queue);
+
+ final MessageConsumer consumer = conn.getSession().createConsumer(q);
+ int messagesReceived = 0;
+
+ long[] messageLog = new long[numTestMessages + 1];
+
+ _logger.info("consuming...");
+ Message msg = consumer.receive(1000);
+ while (msg != null)
+ {
+ messagesReceived++;
+
+ long dt = ((AbstractJMSMessage) msg).getDeliveryTag();
+
+ int msgindex = msg.getIntProperty("index");
+ if (messageLog[msgindex] != 0)
+ {
+ _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag()
+ + ") more than once.");
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + "IN:" + msgindex);
+ }
+
+ if (dt == 0)
+ {
+ _logger.error("DT is zero for msg:" + msgindex);
+ }
+
+ messageLog[msgindex] = dt;
+
+ // get Next message
+ msg = consumer.receive(1000);
+ }
+
+ _logger.info("consuming done.");
+ conn.getSession().commit();
+ consumer.close();
+
+ int index = 0;
+ StringBuilder list = new StringBuilder();
+ list.append("Failed to receive:");
+ int failed = 0;
+
+ _logger.info("consumed: " + messagesReceived);
+
+ assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
+ // wit 0_10 we can have a delivery tag of 0
+ if (conn.isBroker08())
+ {
+ for (long b : messageLog)
+ {
+ if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist
+ {
+ _logger.error("Index: " + index + " was not received.");
+ list.append(" ");
+ list.append(index);
+ list.append(":");
+ list.append(b);
+ failed++;
+ }
+
+ index++;
+ }
+
+ assertEquals(list.toString(), 0, failed);
+ }
+
+ conn.disconnect();
+ passed = true;
+ }
+
+ /** multiple consumers
+ * Based on code subbmitted by client FT-304
+ */
+ public void testTwoCompetingConsumers()
+ {
+ Consumer c1 = new Consumer();
+ Consumer c2 = new Consumer();
+ Consumer c3 = new Consumer();
+ Consumer c4 = new Consumer();
+
+ Thread t1 = new Thread(c1);
+ Thread t2 = new Thread(c2);
+ Thread t3 = new Thread(c3);
+ Thread t4 = new Thread(c4);
+
+ t1.start();
+ t2.start();
+ t3.start();
+ // t4.start();
+
+ try
+ {
+ t1.join();
+ t2.join();
+ t3.join();
+ t4.join();
+ }
+ catch (InterruptedException e)
+ {
+ fail("Unable to join to Consumer theads");
+ }
+
+ _logger.info("consumer 1 count is " + c1.getCount());
+ _logger.info("consumer 2 count is " + c2.getCount());
+ _logger.info("consumer 3 count is " + c3.getCount());
+ _logger.info("consumer 4 count is " + c4.getCount());
+
+ Integer totalConsumed = c1.getCount() + c2.getCount() + c3.getCount() + c4.getCount();
+
+ // Check all messages were correctly delivered
+ int index = 0;
+ StringBuilder list = new StringBuilder();
+ list.append("Failed to receive:");
+ int failed = 0;
+ if (conn.isBroker08())
+ {
+ for (long b : receieved)
+ {
+ if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist (and we don't have msg 0)
+ {
+ _logger.error("Index: " + index + " was not received.");
+ list.append(" ");
+ list.append(index);
+ list.append(":");
+ list.append(b);
+ failed++;
+ }
+
+ index++;
+ }
+
+ assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
+ }
+ assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
+ passed = true;
+ }
+
+ class Consumer implements Runnable
+ {
+ private Integer count = 0;
+ private Integer id;
+
+ public Consumer()
+ {
+ id = consumerIds.addAndGet(1);
+ }
+
+ public void run()
+ {
+ try
+ {
+ _logger.info("consumer-" + id + ": starting");
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
+
+ conn.connect();
+
+ _logger.info("consumer-" + id + ": connected, consuming...");
+ Message result;
+ do
+ {
+ result = conn.getNextMessage(queue, consumeTimeout);
+ if (result != null)
+ {
+
+ long dt = ((AbstractJMSMessage) result).getDeliveryTag();
+
+ if (testReception)
+ {
+ int msgindex = result.getIntProperty("index");
+ if (receieved[msgindex] != 0)
+ {
+ _logger.error("Received Message(" + msgindex + ":"
+ + ((AbstractJMSMessage) result).getDeliveryTag() + ") more than once.");
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt
+ + "IN:" + msgindex);
+ }
+
+ if (dt == 0)
+ {
+ _logger.error("DT is zero for msg:" + msgindex);
+ }
+
+ receieved[msgindex] = dt;
+ }
+
+ count++;
+ if ((count % 100) == 0)
+ {
+ _logger.info("consumer-" + id + ": got " + result + ", new count is " + count);
+ }
+ }
+ }
+ while (result != null);
+
+ _logger.info("consumer-" + id + ": complete");
+ conn.disconnect();
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public Integer getCount()
+ {
+ return count;
+ }
+
+ public Integer getId()
+ {
+ return id;
+ }
+ }
+
+ public void testRequeue() throws JMSException, AMQException, URLSyntaxException
+ {
+ int run = 0;
+ // while (run < 10)
+ {
+ run++;
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("testRequeue run " + run);
+ }
+
+ String virtualHost = "/test";
+ String brokerlist = BROKER;
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+ QpidClientConnection qpc = new QpidClientConnection(BROKER);
+ qpc.connect();
+ Connection conn = qpc. getConnection();
+
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue q = session.createQueue(queue);
+
+ _logger.debug("Create Consumer");
+ MessageConsumer consumer = session.createConsumer(q);
+
+ conn.start();
+
+ _logger.debug("Receiving msg");
+ Message msg = consumer.receive(2000);
+
+ assertNotNull("Message should not be null", msg);
+
+ // As we have not ack'd message will be requeued.
+ _logger.debug("Close Consumer");
+ consumer.close();
+
+ _logger.debug("Close Connection");
+ conn.close();
+ }
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
new file mode 100644
index 0000000000..8a6dfb86ee
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */package org.apache.qpid.test.unit.close;
+
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class TopicPublisherCloseTest extends QpidBrokerTestCase
+{
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testAllMethodsThrowAfterConnectionClose() throws Exception
+ {
+ // give external brokers a chance to start up
+ Thread.sleep(3000);
+
+ AMQConnection connection = (AMQConnection) getConnection("guest", "guest");
+
+ Topic destination1 = new AMQTopic(connection, "t1");
+ TopicSession session1 = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicPublisher pub = session1.createPublisher(destination1);
+ connection.close();
+ try
+ {
+ pub.getDeliveryMode();
+ fail("Expected exception not thrown");
+ }
+ catch (javax.jms.IllegalStateException e)
+ {
+ // PASS
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/VerifyAckingOkDuringClose.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/VerifyAckingOkDuringClose.java
new file mode 100644
index 0000000000..3b30b7d63f
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/VerifyAckingOkDuringClose.java
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.close;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.InitialContext;
+import java.util.ArrayList;
+
+/**
+ * QPID-1791
+ *
+ * The threading model in the Java broker (at least till 0.5) allows for the
+ * close to be handled immediately even if the broker is still processing state
+ * for that Session.
+ *
+ * This test verifys that QPID-1791 is has been handled.
+ *
+ * The problem was that the whilst the Session is busy processing Acks from the
+ * client the Close frame jumps in and clears the unAcknowledgeMap in an
+ * attempt to start processing them for closing the connection.
+ *
+ * If the session had a consumer consuming from a temporary queue. The closing
+ * thread dequeues and deletes the message that were on the uncknowledgedMap.
+ *
+ * However, the Acking thread currently does:
+ * queuEntry = unackedMap.get(messageID)
+ *
+ * dequeueAndDelete(queueEntry)
+ *
+ * unackedMap.remove(messageID)
+ *
+ * As a result the queueEntry is sitting in the unackedMap whilst it is being
+ * dequeuedAndDeleted which leaves the opportunity for the close thread to
+ * remove contents of the unackedMap for processing. The close thread will then
+ * dequeueAndDelete all these values one of which the acking thread is currently
+ * processing.
+ *
+ *
+ * Test Approach
+ *
+ * Send a lot of persistent messages (5000), the goal of which is to fill the
+ * pretch and to provide the broker with a lot of acks to process
+ *
+ * Using client ack and prefetch buffer of 5000 use receive to get 2500
+ * Use AMQMessage.acknowledgeThis() to send a single ack frame back to the
+ * broker per message so 2500 ack frames.
+ * This will give the broker a lot to process,
+ * Immediately send the consumer close after the acks are all gone.
+ * This will cause the remaining 2500 prefetched messages plus any that have
+ * not yet had their acks processed
+ * to be collected by the requeue() process potentially
+ */
+public class VerifyAckingOkDuringClose
+{
+
+ static final int MESSAGE_SENT = 5000;
+
+ public static void main(String[] args) throws Exception
+ {
+ //Check that we have the InitialContext Configured
+
+ if (System.getProperty(InitialContext.INITIAL_CONTEXT_FACTORY) == null)
+ {
+ System.setProperty(InitialContext.INITIAL_CONTEXT_FACTORY, PropertiesFileInitialContextFactory.class.getName());
+ }
+
+ if (System.getProperty(InitialContext.PROVIDER_URL) == null)
+ {
+ System.err.println(InitialContext.PROVIDER_URL + ": Is not set and is required to contain a 'default' ConnectionFactory.");
+ System.exit(1);
+ }
+
+ //Retreive the local factory from the properties file
+ // when used with perftest.properties this will be localhost:5672
+ AMQConnectionFactory factory = (AMQConnectionFactory) new InitialContext().lookup("default");
+
+ AMQConnection connection = (AMQConnection) factory.createConnection("guest", "guest");
+
+ //Use the AMQConnection Interface to set the prefetch to the number
+ // we are sending
+ Session session = connection.createSession(false,
+ Session.CLIENT_ACKNOWLEDGE,
+ MESSAGE_SENT);
+
+ Queue queue = session.createTemporaryQueue();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+ MessageProducer producer = session.createProducer(queue);
+
+ Message message = session.createTextMessage("Close");
+
+ for (int i = 0; i < MESSAGE_SENT; i++)
+ {
+ message.setIntProperty("SequenceNumber", i);
+
+ producer.send(message);
+ }
+
+ // Put a reasonable about of data on the queue.
+
+ //Receive all the messags
+ ArrayList<Message> received = new ArrayList<Message>();
+
+ message = consumer.receive(2000);
+
+ while (message != null)
+ {
+ received.add(message);
+ message = consumer.receive(2000);
+ }
+
+ //Check we have all the messages
+ if (received.size() != MESSAGE_SENT)
+ {
+ System.err.println("Test Failed Not all the messages received:" + received.size());
+ System.exit(1);
+ }
+
+ //individually ack the first half then close
+ for (int i = 0; i < MESSAGE_SENT / 2; i++)
+ {
+ ((org.apache.qpid.jms.Message) received.get(i)).acknowledgeThis();
+ }
+
+ // Close the Session to force a requeue on the server of the unackedMsgs
+
+ System.out.println("Killing client to force requeue on broker");
+
+ System.exit(1);
+ }
+
+}