diff options
Diffstat (limited to 'java/client/src')
17 files changed, 355 insertions, 74 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index c3219e6564..bba39403a5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -182,6 +182,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _fastAccessConsumers[i] = null; } } + + + + public String toString() + { + return "{ Fast: " + Arrays.asList(_fastAccessConsumers) + " ; Slow: " + _slowAccessConsumers + "}"; + } } @@ -299,9 +306,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private final IdToConsumerMap _consumers = new IdToConsumerMap(); - //Map<AMQShortString, BasicMessageConsumer> _consumers = - //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); - /** * Contains a list of consumers which have been removed but which might still have * messages to acknowledge, eg in client ack or transacted modes @@ -1419,7 +1423,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (message.isDeliverMessage()) { _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag()); + _queue.add(message); + } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index efbce6033b..5b1c1aeeee 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -809,9 +809,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** Acknowledge up to last message delivered (if any). Used when commiting. */ void acknowledgeDelivered() { - while (!_receivedDeliveryTags.isEmpty()) + while (!_receivedDeliveryTags.isEmpty()) { - _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false); + _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false); } } @@ -1017,7 +1017,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size()); rollback(); - } + } clearReceiveQueue(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java index d05e99d210..aa7599f355 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java @@ -46,7 +46,7 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic { final AMQProtocolSession session = stateManager.getProtocolSession(); final UnprocessedMessage msg = new UnprocessedMessage.UnprocessedDeliverMessage(body); - _logger.debug("New JmsDeliver method received"); + _logger.debug("New JmsDeliver method received"); session.unprocessedMessageReceived(channelId, msg); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 6a5cc62bfc..d19cd7f0d8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -297,15 +297,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession throw new AMQException("Error: received content body without having received a ContentHeader frame first"); } - /*try - {*/ + msg.receiveBody(contentBody); - /*} - catch (UnexpectedBodyReceivedException e) - { - _channelId2UnprocessedMsgMap.remove(channelId); - throw e; - }*/ if (msg.isAllBodyDataReceived()) { @@ -324,9 +317,13 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * @param channelId the channel id the message should be delivered to * @param msg the message */ - private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg) + private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg) throws AMQException { AMQSession session = getSession(channelId); + if(session == null) + { + throw new AMQException("Error: received message on non-existant channel:" + channelId); + } session.messageReceived(msg); if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java index fe418535d6..ec45d7e182 100644 --- a/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java @@ -126,7 +126,7 @@ public class AMQQueueDeferredOrderingTest extends TestCase _logger.info("Consuming messages"); for (int i = 0; i < NUM_MESSAGES; i++) { - Message msg = consumer.receive(3000); + Message msg = consumer.receive(90000); assertNotNull("Message should not be null", msg); assertTrue("Message should be a text message", msg instanceof TextMessage); assertEquals("Message content does not match expected", Integer.toString(i), ((TextMessage) msg).getText()); diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java index 20632e245f..6e19f53ffe 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -128,7 +128,7 @@ public class MessageListenerMultiConsumerTest extends TestCase { int msg = 0; int MAX_LOOPS = MSG_COUNT * 2; - for (int loops = 0; (msg < MSG_COUNT) || (loops < MAX_LOOPS); loops++) + for (int loops = 0; (msg < MSG_COUNT) && (loops < MAX_LOOPS); loops++) { if (_consumer1.receive(100) != null) @@ -220,12 +220,18 @@ public class MessageListenerMultiConsumerTest extends TestCase for (int msg = 0; msg < (MSG_COUNT / 2); msg++) { - assertTrue(_consumer1.receive() != null); + + final Message message = _consumer1.receive(100000); + if(message == null) + { + System.out.println("!!!!!!!! " + msg); + } + assertTrue(message != null); } for (int msg = 0; msg < (MSG_COUNT / 2); msg++) { - assertTrue(consumer2.receive() != null); + assertTrue(consumer2.receive(10000) != null); } } else @@ -235,12 +241,12 @@ public class MessageListenerMultiConsumerTest extends TestCase for (int msg = 0; msg < (MSG_COUNT / 2); msg++) { - assertTrue(_consumer1.receive() != null); + assertTrue(_consumer1.receive(10000) != null); } for (int msg = 0; msg < (MSG_COUNT / 2); msg++) { - assertTrue(_consumer2.receive() != null); + assertTrue(_consumer2.receive(10000) != null); } } } diff --git a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java index 21f3e273aa..81d9a39dd3 100644 --- a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java @@ -116,6 +116,7 @@ public class ResetMessageListenerTest extends TestCase { _producer.send(_producerSession.createTextMessage("Message " + msg)); } +// Thread.sleep(120000); } @@ -247,6 +248,14 @@ public class ResetMessageListenerTest extends TestCase { _producer.send(_producerSession.createTextMessage("Message " + msg)); } +// try +// { +// Thread.sleep(120000); +// } +// catch (InterruptedException e) +// { +// e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. +// } } catch (JMSException e) { @@ -257,7 +266,7 @@ public class ResetMessageListenerTest extends TestCase try { - _allSecondMessagesSent.await(5000, TimeUnit.MILLISECONDS); + _allSecondMessagesSent.await(500000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index b6f46b4acc..9b34c36ec6 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -115,6 +115,7 @@ public class RecoverTest extends TestCase consumerSession.recover(); tm = (TextMessage) consumer.receiveNoWait(); + assertNull(tm); _logger.info("No messages redelivered as is expected"); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java index cc18169a5b..56247f9634 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java @@ -28,6 +28,9 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.testutil.VMBrokerSetup; +import org.apache.qpid.AMQException; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Level; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +52,7 @@ public class SessionStartTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java index 559e9a4741..3c2e72c07e 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -69,6 +69,7 @@ public class ChannelCloseOkTest extends TestCase private static final Logger _log = LoggerFactory.getLogger(ChannelCloseOkTest.class); public String _connectionString = "vm://:1"; + private static final int NUM_MESSAGES = 300; protected void setUp() throws Exception { @@ -170,19 +171,18 @@ public class ChannelCloseOkTest extends TestCase // Ensure both sessions are still ok. // Send a bunch of messages as this give time for the sessions to be erroneously closed. - final int num = 300; - for (int i = 0; i < num; ++i) + for (int i = 0; i < NUM_MESSAGES; ++i) { send(_session1, _destination1, "" + i); send(_session2, _destination2, "" + i); } - waitFor(_received1, num + 1); - waitFor(_received2, num + 1); + waitFor(_received1, NUM_MESSAGES + 1); + waitFor(_received2, NUM_MESSAGES + 1); // Note that the third message is never received as it is sent to an incorrect destination. - assertEquals(num + 1, _received1.size()); - assertEquals(num + 1, _received2.size()); + assertEquals(NUM_MESSAGES + 1, _received1.size()); + assertEquals(NUM_MESSAGES + 1, _received2.size()); } private void sendAndWait(Session session, Destination destination, String message, List<Message> received, int count) @@ -199,15 +199,17 @@ public class ChannelCloseOkTest extends TestCase producer1.send(session.createTextMessage(message)); } - private void waitFor(List<Message> received, int count) throws InterruptedException + private void waitFor(List<Message> received, final int count) throws InterruptedException { + int lastSeen = -1; synchronized (received) { - while (received.size() < count) + while ((lastSeen != received.size()) && (lastSeen = received.size()) < count) { + try { - received.wait(); + received.wait(2000L); } catch (InterruptedException e) { @@ -216,6 +218,10 @@ public class ChannelCloseOkTest extends TestCase } } } + if(received.size() < count) + { + throw new RuntimeException("Expected: " + count + " got: " + received.size()); + } } private static String randomize(String in) diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java index 19ef612bcc..2ee29e3da4 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java @@ -75,7 +75,9 @@ public class Client implements MessageListener public synchronized void onMessage(Message response) { + _logger.info("Received " + (++_count) + " of " + _expected + " responses."); + if (_count == _expected) { @@ -89,10 +91,10 @@ public class Client implements MessageListener if (_count < _expected) { - wait(10000L); + wait(1000L); } - if (_count < _expected) + if (_count != _expected) { throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java index 9cde24dd92..81227b9540 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java @@ -22,9 +22,13 @@ package org.apache.qpid.test.unit.client.forwardall; import junit.framework.TestCase; import org.apache.qpid.testutil.VMBrokerSetup; +import org.apache.qpid.server.queue.SimpleAMQQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Set; +import java.util.HashSet; + /** * Runs the Service's and Client parts of the test in the same process * as the broker @@ -34,6 +38,7 @@ public class CombinedTest extends TestCase private static final Logger _logger = LoggerFactory.getLogger(CombinedTest.class); private int run = 0; + protected void setUp() throws Exception { super.setUp(); @@ -47,16 +52,16 @@ public class CombinedTest extends TestCase public void testForwardAll() throws Exception { - while (run < 10) + while (run < 100) { int services = 2; ServiceCreator.start("vm://:1", services); - + Thread.sleep(100); _logger.info("Starting " + ++run + " client..."); new Client("vm://:1", services).shutdownWhenComplete(); - + ServiceCreator.closeAll(); _logger.info("Completed " + run + " successfully!"); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java index 6593f7d86a..bf03ce6899 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java @@ -37,14 +37,16 @@ public class Service implements MessageListener { private final AMQConnection _connection; private final AMQSession _session; + private final int _id; - Service(String broker) throws Exception + Service(String broker, int id) throws Exception { - this(connect(broker)); + this(connect(broker), id); } - Service(AMQConnection connection) throws Exception + Service(AMQConnection connection, int id) throws Exception { + _id = id; _connection = connection; AMQQueue queue = new SpecialQueue(connection, "ServiceQueue"); _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); @@ -56,7 +58,7 @@ public class Service implements MessageListener { try { - Message response = _session.createTextMessage("Response!"); + Message response = _session.createTextMessage("Response! " + _id); Destination replyTo = request.getJMSReplyTo(); _session.createProducer(replyTo).send(response); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java index be16f6b7ae..310a0993bc 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java @@ -34,17 +34,19 @@ public class ServiceCreator implements Runnable private final String broker; private Service service; + private final int id; - ServiceCreator(String broker) + ServiceCreator(String broker, final int id) { this.broker = broker; + this.id = id; } public void run() { try { - service = new Service(broker); + service = new Service(broker, id); } catch (Exception e) { @@ -76,11 +78,12 @@ public class ServiceCreator implements Runnable { threads = new Thread[services]; _services = new ServiceCreator[services]; - ServiceCreator runner = new ServiceCreator(broker); + //ServiceCreator runner = new ServiceCreator(broker); // start services _logger.info("Starting " + services + " services..."); for (int i = 0; i < services; i++) { + ServiceCreator runner = new ServiceCreator(broker,i); threads[i] = new Thread(runner); _services[i] = runner; threads[i].start(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index 8d7645c1fd..56904f20de 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -40,30 +40,40 @@ import javax.jms.Queue; import javax.jms.Session; import java.util.concurrent.atomic.AtomicInteger; +import java.util.Random; +import java.util.UUID; public class MessageRequeueTest extends TestCase { private static final Logger _logger = LoggerFactory.getLogger(MessageRequeueTest.class); + + protected static AtomicInteger consumerIds = new AtomicInteger(0); protected final Integer numTestMessages = 150; protected final int consumeTimeout = 3000; - protected final String queue = "direct://amq.direct//queue"; + //protected final String queue = "direct://amq.direct//queue"; protected String payload = "Message:"; protected final String BROKER = "vm://:1"; private boolean testReception = true; private long[] receieved = new long[numTestMessages + 1]; - private boolean passed = false; + //private boolean passed = false; protected void setUp() throws Exception { super.setUp(); TransportConnection.createVMBroker(1); + + } + + private void putMessagesOnQueueThenClose(String queue) + throws JMSException, InterruptedException + { QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -76,20 +86,25 @@ public class MessageRequeueTest extends TestCase conn.disconnect(); } - protected void tearDown() throws Exception + + + private void tearDownQueue(String queue) + throws JMSException, InterruptedException + { - super.tearDown(); + QpidClientConnection conn = new QpidClientConnection(BROKER); - if (!passed) // clean up - { - QpidClientConnection conn = new QpidClientConnection(BROKER); + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); - conn.connect(); - // clear queue - conn.consume(queue, consumeTimeout); + conn.disconnect(); + } - conn.disconnect(); - } + + protected void tearDown() throws Exception + { + super.tearDown(); TransportConnection.killVMBroker(1); } @@ -102,6 +117,11 @@ public class MessageRequeueTest extends TestCase */ public void testDrain() throws JMSException, InterruptedException { + + String queue = "direct://amq.direct//queue" + UUID.randomUUID(); + + putMessagesOnQueueThenClose(queue); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -172,18 +192,22 @@ public class MessageRequeueTest extends TestCase assertEquals(list.toString(), 0, failed); _logger.info("consumed: " + messagesReceived); conn.disconnect(); - passed = true; + tearDownQueue(queue); } /** multiple consumers * Based on code subbmitted by client FT-304 */ - public void testCompetingConsumers() + public void testCompetingConsumers() throws JMSException, InterruptedException { - Consumer c1 = new Consumer(); - Consumer c2 = new Consumer(); - Consumer c3 = new Consumer(); - Consumer c4 = new Consumer(); + String queue = "direct://amq.direct//queue" + UUID.randomUUID(); + + putMessagesOnQueueThenClose(queue); + + Consumer c1 = new Consumer(queue); + Consumer c2 = new Consumer(queue); + Consumer c3 = new Consumer(queue); + Consumer c4 = new Consumer(queue); Thread t1 = new Thread(c1); Thread t2 = new Thread(c2); @@ -237,16 +261,18 @@ public class MessageRequeueTest extends TestCase assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); assertTrue("number of consumed messages does not match initial data: " + totalConsumed, numTestMessages <= totalConsumed); - passed = true; + tearDownQueue(queue); } class Consumer implements Runnable { private Integer count = 0; private Integer id; + private final String _queue; - public Consumer() + public Consumer(String queue) { + _queue = queue; id = consumerIds.addAndGet(1); } @@ -263,7 +289,7 @@ public class MessageRequeueTest extends TestCase Message result; do { - result = conn.getNextMessage(queue, consumeTimeout); + result = conn.getNextMessage(_queue, consumeTimeout); if (result != null) { @@ -322,8 +348,11 @@ public class MessageRequeueTest extends TestCase } } - public void testRequeue() throws JMSException, AMQException, URLSyntaxException + public void testRequeue() throws JMSException, AMQException, URLSyntaxException, InterruptedException { + String queue = "direct://amq.direct//queue" + UUID.randomUUID(); + putMessagesOnQueueThenClose(queue); + int run = 0; // while (run < 10) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 101cba2352..98c0225096 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -88,7 +88,7 @@ public class DurableSubscriptionTest extends TestCase Message msg; _logger.info("Receive message on consumer 1:expecting A"); - msg = consumer1.receive(); + msg = consumer1.receive(1000); assertEquals("A", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(1000); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 065b06a87d..39730ef3ac 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -20,28 +20,33 @@ */ package org.apache.qpid.test.unit.topic; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; +import javax.jms.*; import javax.jms.MessageConsumer; import javax.jms.Session; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; +import javax.jms.Message; import junit.framework.TestCase; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.AMQException; +import org.apache.qpid.jms.*; + +import java.util.UUID; /** @author Apache Software Foundation */ public class TopicSessionTest extends TestCase { private static final String BROKER = "vm://:1"; + private static final int THREADS = 20; + private static final int MESSAGE_COUNT = 10000; + private static final int MESSAGE_SIZE = 128; protected void setUp() throws Exception { @@ -102,6 +107,60 @@ public class TopicSessionTest extends TestCase subscriptionNameReuseForDifferentTopic(true); } + public void notestSilly() throws Exception + { + + + final ExceptionListener listener = new ExceptionListener() + { + public void onException(JMSException jmsException) + { + //To change body of implemented methods use File | Settings | File Templates. + } + }; + + + Thread[] threads = new Thread[100]; + + for(int j = 0; j < 20; j++) + { + threads[j] = new Thread(new Runnable() { + public void run() + { + try + { + AMQConnection con = new AMQConnection("tcp://127.0.0.1:5672?retries='0'", "guest", "guest", "test", "test"); + AMQTopic topic = new AMQTopic(con, "MyTopic1" + UUID.randomUUID()); + + + TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + + con.setExceptionListener(listener); + + TopicPublisher publisher = session1.createPublisher(topic); + + con.start(); + + while(true) + { + publisher.publish(session1.createTextMessage("hello")); + Thread.sleep(THREADS); + } + } + catch(Exception e) + { + e.printStackTrace(); + } + } + }); + threads[j].run(); + } + + threads[0].join(); + + } + + private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception { AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); @@ -368,8 +427,160 @@ public class TopicSessionTest extends TestCase con2.close(); } + + public void noTestPublishToManyConsumers() throws Exception + { + + + final ExceptionListener exceptionListener = new ExceptionListener() + { + public void onException(JMSException jmsException) + { + jmsException.printStackTrace(); + } + }; + + + + SubscribingThread[] threads = new SubscribingThread[100]; + + final String topicName = "MyTopic1" + UUID.randomUUID(); + for(int j = 0; j < 21; j++) + { + final int threadId = j; + threads[threadId] = new SubscribingThread(threadId, topicName, exceptionListener); + threads[j].start(); + Thread.sleep(100); + } + + + threads[1].join(); + + int totalMessages = 0; + + for(int j = 1; j < 21; j++) + { + + System.err.println("Thread " + j + ": " + threads[j].msgId); + totalMessages += threads[j].msgId; + } + + System.err.println("****** Total: " + totalMessages); + + + } + + + + public static junit.framework.Test suite() { return new junit.framework.TestSuite(TopicSessionTest.class); } + + private static class SubscribingThread extends Thread + { + private final int _threadId; + private final String _topicName; + private final ExceptionListener _exceptionListener; + int msgId = 0; + + public SubscribingThread(final int threadId, final String topicName, final ExceptionListener exceptionListener) + { + _threadId = threadId; + _topicName = topicName; + _exceptionListener = exceptionListener; + } + + public void run() + { + try + { + System.err.println("Thread: " + _threadId); + + + if(_threadId >0) + { + + AMQConnection con2 = new AMQConnection("tcp://127.0.0.1:5672?retries='0'", "guest", "guest", "test", "test"); + //AMQConnection con2 = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test", "test"); + AMQTopic topic2 = new AMQTopic(con2, _topicName); + TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSubscriber sub = session2.createSubscriber(topic2); + con2.setExceptionListener(_exceptionListener); + + + + final MessageListener messageListener = new MessageListener() + { + + public void onMessage(Message message) + { + try + { + msgId = message.getIntProperty("MessageId"); + if(msgId % 1000 == 0) + { + System.err.println("Thread: " + _threadId + ": " + msgId + "messages"); + } + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + }; + + + sub.setMessageListener(messageListener); + con2.start(); + + Thread.sleep(125000); + + +// Thread.sleep(1200000); + } + else + { + int messageId = 0; + + AMQConnection con = new AMQConnection("tcp://127.0.0.1:5672?retries='0'", "guest", "guest", "test", "test"); + //AMQConnection con = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test", "test"); + + + AMQTopic topic = new AMQTopic(con, _topicName); + + + TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + + con.setExceptionListener(_exceptionListener); + + TopicPublisher publisher = session1.createPublisher(topic); + publisher.setDisableMessageID(true); + publisher.setDisableMessageTimestamp(true); + con.start(); + + Thread.sleep(5000); + + publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + while(messageId <= 240000) + //while(messageId <= 10000) + { + final TextMessage textMessage = session1.createTextMessage("hello"); + textMessage.setIntProperty("MessageId", messageId++); + + + publisher.publish(textMessage); + + } + } + + } + catch(Exception e) + { + e.printStackTrace(); + } + } + } } |