diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java new file mode 100644 index 0000000000..e1f639afb6 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java @@ -0,0 +1,213 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.test.client; + +import org.apache.qpid.client.AMQSession_0_8; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.log4j.Logger; + +import javax.jms.*; + +public class FlowControlTest extends QpidBrokerTestCase +{ + private static final Logger _logger = Logger.getLogger(FlowControlTest.class); + + private Connection _clientConnection; + private Session _clientSession; + private Queue _queue; + + /** + * Simply + * + * @throws Exception + */ + public void testBasicBytesFlowControl() throws Exception + { + _queue = (Queue) getInitialContext().lookup("queue"); + + //Create Client + _clientConnection = getConnection(); + + _clientConnection.start(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Ensure _queue is created + _clientSession.createConsumer(_queue).close(); + + Connection producerConnection = getConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); + + BytesMessage m1 = producerSession.createBytesMessage(); + m1.writeBytes(new byte[128]); + m1.setIntProperty("msg", 1); + producer.send(m1); + BytesMessage m2 = producerSession.createBytesMessage(); + m2.writeBytes(new byte[128]); + m2.setIntProperty("msg", 2); + producer.send(m2); + BytesMessage m3 = producerSession.createBytesMessage(); + m3.writeBytes(new byte[256]); + m3.setIntProperty("msg", 3); + producer.send(m3); + + producer.close(); + producerSession.close(); + producerConnection.close(); + + Connection consumerConnection = getConnection(); + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession_0_8) consumerSession).setPrefetchLimits(0, 256); + MessageConsumer recv = consumerSession.createConsumer(_queue); + consumerConnection.start(); + + Message r1 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("First message not received", r1); + assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg")); + + Message r2 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("Second message not received", r2); + assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); + + Message r3 = recv.receive(RECEIVE_TIMEOUT); + assertNull("Third message incorrectly delivered", r3); + + ((AbstractJMSMessage)r1).acknowledgeThis(); + + r3 = recv.receive(RECEIVE_TIMEOUT); + assertNull("Third message incorrectly delivered", r3); + + ((AbstractJMSMessage)r2).acknowledgeThis(); + + r3 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("Third message not received", r3); + assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); + + ((AbstractJMSMessage)r3).acknowledgeThis(); + consumerConnection.close(); + } + + public void testTwoConsumersBytesFlowControl() throws Exception + { + _queue = (Queue) getInitialContext().lookup("queue"); + + //Create Client + _clientConnection = getConnection(); + + _clientConnection.start(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Ensure _queue is created + _clientSession.createConsumer(_queue).close(); + + Connection producerConnection = getConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); + + BytesMessage m1 = producerSession.createBytesMessage(); + m1.writeBytes(new byte[128]); + m1.setIntProperty("msg", 1); + producer.send(m1); + BytesMessage m2 = producerSession.createBytesMessage(); + m2.writeBytes(new byte[256]); + m2.setIntProperty("msg", 2); + producer.send(m2); + BytesMessage m3 = producerSession.createBytesMessage(); + m3.writeBytes(new byte[128]); + m3.setIntProperty("msg", 3); + producer.send(m3); + + producer.close(); + producerSession.close(); + producerConnection.close(); + + Connection consumerConnection = getConnection(); + Session consumerSession1 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession_0_8) consumerSession1).setPrefetchLimits(0, 256); + MessageConsumer recv1 = consumerSession1.createConsumer(_queue); + + consumerConnection.start(); + + Message r1 = recv1.receive(RECEIVE_TIMEOUT); + assertNotNull("First message not received", r1); + assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg")); + + Message r2 = recv1.receive(RECEIVE_TIMEOUT); + assertNull("Second message incorrectly delivered", r2); + + Session consumerSession2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession_0_8) consumerSession2).setPrefetchLimits(0, 256); + MessageConsumer recv2 = consumerSession2.createConsumer(_queue); + + r2 = recv2.receive(RECEIVE_TIMEOUT); + assertNotNull("Second message not received", r2); + assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); + + Message r3 = recv2.receive(RECEIVE_TIMEOUT); + assertNull("Third message incorrectly delivered", r3); + + r3 = recv1.receive(RECEIVE_TIMEOUT); + assertNotNull("Third message not received", r3); + assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); + + r2.acknowledge(); + r3.acknowledge(); + recv1.close(); + recv2.close(); + consumerSession1.close(); + consumerSession2.close(); + consumerConnection.close(); + + } + + public static void main(String args[]) throws Throwable + { + FlowControlTest test = new FlowControlTest(); + + int run = 0; + while (true) + { + System.err.println("Test Run:" + ++run); + Thread.sleep(1000); + try + { + test.startBroker(); + test.testBasicBytesFlowControl(); + + Thread.sleep(1000); + } + finally + { + test.stopBroker(); + } + } + } +} + |