diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close')
5 files changed, 863 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java new file mode 100644 index 0000000000..039a172e4d --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java @@ -0,0 +1,142 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.close; + +import junit.framework.Assert; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.junit.concurrency.TestRunnable; +import org.apache.qpid.junit.concurrency.ThreadTestCoordinator; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; + +/** + * This test forces the situation where a session is closed whilst a message consumer is still in its onMessage method. + * Running in AUTO_ACK mode, the close call ought to wait until the onMessage method completes, and the ack is sent + * before closing the connection. + * + * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Check that + * closing a connection whilst handling a message, blocks till completion of the handler. </table> + */ +public class CloseBeforeAckTest extends QpidBrokerTestCase +{ + private static final Logger log = LoggerFactory.getLogger(CloseBeforeAckTest.class); + + Connection connection; + Session session; + public static final String TEST_QUEUE_NAME = "TestQueue"; + private int TEST_COUNT = 25; + + class TestThread1 extends TestRunnable implements MessageListener + { + public void runWithExceptions() throws Exception + { + // Set this up to listen for message on the test session. + session.createConsumer(session.createQueue(TEST_QUEUE_NAME)).setMessageListener(this); + } + + public void onMessage(Message message) + { + // Give thread 2 permission to close the session. + allow(new int[] { 1 }); + + // Wait until thread 2 has closed the connection, or is blocked waiting for this to complete. + waitFor(new int[] { 1 }, true); + } + } + + TestThread1 testThread1 = new TestThread1(); + + TestRunnable testThread2 = + new TestRunnable() + { + public void runWithExceptions() throws Exception + { + // Send a message to be picked up by thread 1. + session.createProducer(null).send(session.createQueue(TEST_QUEUE_NAME), + session.createTextMessage("Hi there thread 1!")); + + // Wait for thread 1 to pick up the message and give permission to continue. + waitFor(new int[] { 0 }, false); + + // Close the connection. + session.close(); + + // Allow thread 1 to continue to completion, if it is erronously still waiting. + allow(new int[] { 1 }); + } + }; + + public void testCloseBeforeAutoAck_QPID_397() throws Exception + { + // Create a session in auto acknowledge mode. This problem shows up in auto acknowledge if the client acks + // message at the end of the onMessage method, after a close has been sent. + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ThreadTestCoordinator tt = new ThreadTestCoordinator(2); + + tt.addTestThread(testThread1, 0); + tt.addTestThread(testThread2, 1); + tt.setDeadlockTimeout(500); + tt.run(); + + String errorMessage = tt.joinAndRetrieveMessages(); + + // Print any error messages or exceptions. + log.debug(errorMessage); + + if (!tt.getExceptions().isEmpty()) + { + for (Exception e : tt.getExceptions()) + { + log.debug("Exception thrown during test thread: ", e); + } + } + + Assert.assertTrue(errorMessage, "".equals(errorMessage)); + } + + public void closeBeforeAutoAckManyTimes() throws Exception + { + for (int i = 0; i < TEST_COUNT; i++) + { + testCloseBeforeAutoAck_QPID_397(); + } + } + + protected void setUp() throws Exception + { + super.setUp(); + connection = getConnection("guest", "guest"); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java new file mode 100644 index 0000000000..6bc6c591ae --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.close; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Session; + +/** QPID-1809 + * + * Race condition on error handling and close logic. + * + * See most often with SimpleACLTest as this test is the expects the server to + * shut the connection/channels. This sort of testing is not performed by many, + * if any, of the other system tests. + * + * The problem is that we have two threads + * + * MainThread Exception(Mina)Thread + * | | + * Performs | + * ACtion | + * | Receives Server + * | Close + * Blocks for | + * Response | + * | Starts To Notify + * | client + * | | + * | <----- Notify Main Thread + * Notification | + * wakes client | + * | | + * Client then | + * processes Error. | + * | | + * Potentially Attempting Close Channel/Connection + * Connection Close + * + * The two threads both attempt to close the connection but the main thread does + * so assuming that the connection is open and valid. + * + * The Exception thread must modify the connection so that no furter syncWait + * commands are performed. + * + * This test sends an ExchangeDeclare that is Asynchronous and will fail and + * so cause a ChannelClose error but we perform a syncWait so that we can be + * sure to test that the BlockingWaiter is correctly awoken. + * + */ +public class JavaServerCloseRaceConditionTest extends QpidBrokerTestCase +{ + private static final String EXCHANGE_NAME = "NewExchangeNametoFailLookup"; + + public void test() throws Exception + { + + AMQConnection connection = (AMQConnection) getConnection(); + + AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Set no wait true so that we block the connection + // Also set a different exchange class string so the attempt to declare + // the exchange causes an exchange. + ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), new AMQShortString(EXCHANGE_NAME), null, + true, false, false, false, true, null); + + AMQFrame exchangeDeclare = body.generateFrame(session.getChannelId()); + + try + { + // block our thread so that can times out + connection.getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + } + catch (Exception e) + { + assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME)); + } + + try + { + // Depending on if the notification thread has closed the connection + // or not we may get an exception here when we attempt to close the + // connection. If we do get one then it should be the same as above + // an AMQAuthenticationException. + connection.close(); + } + catch (Exception e) + { + assertTrue("Exception should say the exchange is not known.", e.getMessage().contains("Unknown exchange: " + EXCHANGE_NAME)); + } + + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java new file mode 100644 index 0000000000..de092fc893 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.close; + +import org.apache.qpid.AMQException; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.QpidClientConnection; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.url.URLSyntaxException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import java.util.concurrent.atomic.AtomicInteger; + +public class MessageRequeueTest extends QpidBrokerTestCase +{ + private static final Logger _logger = LoggerFactory.getLogger(MessageRequeueTest.class); + + protected static AtomicInteger consumerIds = new AtomicInteger(0); + protected final Integer numTestMessages = 150; + + protected final int consumeTimeout = 3000; + + protected final String queue = "direct://amq.direct//message-requeue-test-queue"; + protected String payload = "Message:"; + + //protected final String BROKER = "vm://:1"; + protected final String BROKER = "tcp://127.0.0.1:5672"; + private boolean testReception = true; + + private long[] receieved = new long[numTestMessages + 1]; + private boolean passed = false; + QpidClientConnection conn; + + + protected void setUp() throws Exception + { + super.setUp(); + + conn = new QpidClientConnection(BROKER); + + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); + // load test data + _logger.info("creating test data, " + numTestMessages + " messages"); + conn.put(queue, payload, numTestMessages); + // close this connection + conn.disconnect(); + } + + protected void tearDown() throws Exception + { + + if (!passed) // clean up + { + QpidClientConnection conn = new QpidClientConnection(BROKER); + + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); + + conn.disconnect(); + } + + super.tearDown(); + } + + /** + * multiple consumers + * + * @throws javax.jms.JMSException if a JMS problem occurs + * @throws InterruptedException on timeout + */ + public void testDrain() throws Exception + { + QpidClientConnection conn = new QpidClientConnection(BROKER); + + conn.connect(); + + _logger.info("consuming queue " + queue); + Queue q = conn.getSession().createQueue(queue); + + final MessageConsumer consumer = conn.getSession().createConsumer(q); + int messagesReceived = 0; + + long[] messageLog = new long[numTestMessages + 1]; + + _logger.info("consuming..."); + Message msg = consumer.receive(1000); + while (msg != null) + { + messagesReceived++; + + long dt = ((AbstractJMSMessage) msg).getDeliveryTag(); + + int msgindex = msg.getIntProperty("index"); + if (messageLog[msgindex] != 0) + { + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + + ") more than once."); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + "IN:" + msgindex); + } + + if (dt == 0) + { + _logger.error("DT is zero for msg:" + msgindex); + } + + messageLog[msgindex] = dt; + + // get Next message + msg = consumer.receive(1000); + } + + _logger.info("consuming done."); + conn.getSession().commit(); + consumer.close(); + + int index = 0; + StringBuilder list = new StringBuilder(); + list.append("Failed to receive:"); + int failed = 0; + + _logger.info("consumed: " + messagesReceived); + + assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived); + // wit 0_10 we can have a delivery tag of 0 + if (conn.isBroker08()) + { + for (long b : messageLog) + { + if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist + { + _logger.error("Index: " + index + " was not received."); + list.append(" "); + list.append(index); + list.append(":"); + list.append(b); + failed++; + } + + index++; + } + + assertEquals(list.toString(), 0, failed); + } + + conn.disconnect(); + passed = true; + } + + /** multiple consumers + * Based on code subbmitted by client FT-304 + */ + public void testTwoCompetingConsumers() + { + Consumer c1 = new Consumer(); + Consumer c2 = new Consumer(); + Consumer c3 = new Consumer(); + Consumer c4 = new Consumer(); + + Thread t1 = new Thread(c1); + Thread t2 = new Thread(c2); + Thread t3 = new Thread(c3); + Thread t4 = new Thread(c4); + + t1.start(); + t2.start(); + t3.start(); + // t4.start(); + + try + { + t1.join(); + t2.join(); + t3.join(); + t4.join(); + } + catch (InterruptedException e) + { + fail("Unable to join to Consumer theads"); + } + + _logger.info("consumer 1 count is " + c1.getCount()); + _logger.info("consumer 2 count is " + c2.getCount()); + _logger.info("consumer 3 count is " + c3.getCount()); + _logger.info("consumer 4 count is " + c4.getCount()); + + Integer totalConsumed = c1.getCount() + c2.getCount() + c3.getCount() + c4.getCount(); + + // Check all messages were correctly delivered + int index = 0; + StringBuilder list = new StringBuilder(); + list.append("Failed to receive:"); + int failed = 0; + if (conn.isBroker08()) + { + for (long b : receieved) + { + if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist (and we don't have msg 0) + { + _logger.error("Index: " + index + " was not received."); + list.append(" "); + list.append(index); + list.append(":"); + list.append(b); + failed++; + } + + index++; + } + + assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); + } + assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); + passed = true; + } + + class Consumer implements Runnable + { + private Integer count = 0; + private Integer id; + + public Consumer() + { + id = consumerIds.addAndGet(1); + } + + public void run() + { + try + { + _logger.info("consumer-" + id + ": starting"); + QpidClientConnection conn = new QpidClientConnection(BROKER); + + conn.connect(); + + _logger.info("consumer-" + id + ": connected, consuming..."); + Message result; + do + { + result = conn.getNextMessage(queue, consumeTimeout); + if (result != null) + { + + long dt = ((AbstractJMSMessage) result).getDeliveryTag(); + + if (testReception) + { + int msgindex = result.getIntProperty("index"); + if (receieved[msgindex] != 0) + { + _logger.error("Received Message(" + msgindex + ":" + + ((AbstractJMSMessage) result).getDeliveryTag() + ") more than once."); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + + "IN:" + msgindex); + } + + if (dt == 0) + { + _logger.error("DT is zero for msg:" + msgindex); + } + + receieved[msgindex] = dt; + } + + count++; + if ((count % 100) == 0) + { + _logger.info("consumer-" + id + ": got " + result + ", new count is " + count); + } + } + } + while (result != null); + + _logger.info("consumer-" + id + ": complete"); + conn.disconnect(); + + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + public Integer getCount() + { + return count; + } + + public Integer getId() + { + return id; + } + } + + public void testRequeue() throws JMSException, AMQException, URLSyntaxException + { + int run = 0; + // while (run < 10) + { + run++; + + if (_logger.isInfoEnabled()) + { + _logger.info("testRequeue run " + run); + } + + String virtualHost = "/test"; + String brokerlist = BROKER; + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + QpidClientConnection qpc = new QpidClientConnection(BROKER); + qpc.connect(); + Connection conn = qpc. getConnection(); + + Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue q = session.createQueue(queue); + + _logger.debug("Create Consumer"); + MessageConsumer consumer = session.createConsumer(q); + + conn.start(); + + _logger.debug("Receiving msg"); + Message msg = consumer.receive(2000); + + assertNotNull("Message should not be null", msg); + + // As we have not ack'd message will be requeued. + _logger.debug("Close Consumer"); + consumer.close(); + + _logger.debug("Close Connection"); + conn.close(); + } + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java new file mode 100644 index 0000000000..8a6dfb86ee --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */package org.apache.qpid.test.unit.close; + +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * @author Apache Software Foundation + */ +public class TopicPublisherCloseTest extends QpidBrokerTestCase +{ + + protected void setUp() throws Exception + { + super.setUp(); + } + + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + public void testAllMethodsThrowAfterConnectionClose() throws Exception + { + // give external brokers a chance to start up + Thread.sleep(3000); + + AMQConnection connection = (AMQConnection) getConnection("guest", "guest"); + + Topic destination1 = new AMQTopic(connection, "t1"); + TopicSession session1 = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicPublisher pub = session1.createPublisher(destination1); + connection.close(); + try + { + pub.getDeliveryMode(); + fail("Expected exception not thrown"); + } + catch (javax.jms.IllegalStateException e) + { + // PASS + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/VerifyAckingOkDuringClose.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/VerifyAckingOkDuringClose.java new file mode 100644 index 0000000000..3b30b7d63f --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/VerifyAckingOkDuringClose.java @@ -0,0 +1,160 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.close; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.naming.InitialContext; +import java.util.ArrayList; + +/** + * QPID-1791 + * + * The threading model in the Java broker (at least till 0.5) allows for the + * close to be handled immediately even if the broker is still processing state + * for that Session. + * + * This test verifys that QPID-1791 is has been handled. + * + * The problem was that the whilst the Session is busy processing Acks from the + * client the Close frame jumps in and clears the unAcknowledgeMap in an + * attempt to start processing them for closing the connection. + * + * If the session had a consumer consuming from a temporary queue. The closing + * thread dequeues and deletes the message that were on the uncknowledgedMap. + * + * However, the Acking thread currently does: + * queuEntry = unackedMap.get(messageID) + * + * dequeueAndDelete(queueEntry) + * + * unackedMap.remove(messageID) + * + * As a result the queueEntry is sitting in the unackedMap whilst it is being + * dequeuedAndDeleted which leaves the opportunity for the close thread to + * remove contents of the unackedMap for processing. The close thread will then + * dequeueAndDelete all these values one of which the acking thread is currently + * processing. + * + * + * Test Approach + * + * Send a lot of persistent messages (5000), the goal of which is to fill the + * pretch and to provide the broker with a lot of acks to process + * + * Using client ack and prefetch buffer of 5000 use receive to get 2500 + * Use AMQMessage.acknowledgeThis() to send a single ack frame back to the + * broker per message so 2500 ack frames. + * This will give the broker a lot to process, + * Immediately send the consumer close after the acks are all gone. + * This will cause the remaining 2500 prefetched messages plus any that have + * not yet had their acks processed + * to be collected by the requeue() process potentially + */ +public class VerifyAckingOkDuringClose +{ + + static final int MESSAGE_SENT = 5000; + + public static void main(String[] args) throws Exception + { + //Check that we have the InitialContext Configured + + if (System.getProperty(InitialContext.INITIAL_CONTEXT_FACTORY) == null) + { + System.setProperty(InitialContext.INITIAL_CONTEXT_FACTORY, PropertiesFileInitialContextFactory.class.getName()); + } + + if (System.getProperty(InitialContext.PROVIDER_URL) == null) + { + System.err.println(InitialContext.PROVIDER_URL + ": Is not set and is required to contain a 'default' ConnectionFactory."); + System.exit(1); + } + + //Retreive the local factory from the properties file + // when used with perftest.properties this will be localhost:5672 + AMQConnectionFactory factory = (AMQConnectionFactory) new InitialContext().lookup("default"); + + AMQConnection connection = (AMQConnection) factory.createConnection("guest", "guest"); + + //Use the AMQConnection Interface to set the prefetch to the number + // we are sending + Session session = connection.createSession(false, + Session.CLIENT_ACKNOWLEDGE, + MESSAGE_SENT); + + Queue queue = session.createTemporaryQueue(); + + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + MessageProducer producer = session.createProducer(queue); + + Message message = session.createTextMessage("Close"); + + for (int i = 0; i < MESSAGE_SENT; i++) + { + message.setIntProperty("SequenceNumber", i); + + producer.send(message); + } + + // Put a reasonable about of data on the queue. + + //Receive all the messags + ArrayList<Message> received = new ArrayList<Message>(); + + message = consumer.receive(2000); + + while (message != null) + { + received.add(message); + message = consumer.receive(2000); + } + + //Check we have all the messages + if (received.size() != MESSAGE_SENT) + { + System.err.println("Test Failed Not all the messages received:" + received.size()); + System.exit(1); + } + + //individually ack the first half then close + for (int i = 0; i < MESSAGE_SENT / 2; i++) + { + ((org.apache.qpid.jms.Message) received.get(i)).acknowledgeThis(); + } + + // Close the Session to force a requeue on the server of the unackedMsgs + + System.out.println("Killing client to force requeue on broker"); + + System.exit(1); + } + +} |