summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java15
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java16
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java11
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java1
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java24
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java11
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java9
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java73
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java225
17 files changed, 355 insertions, 74 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index c3219e6564..bba39403a5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -182,6 +182,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_fastAccessConsumers[i] = null;
}
}
+
+
+
+ public String toString()
+ {
+ return "{ Fast: " + Arrays.asList(_fastAccessConsumers) + " ; Slow: " + _slowAccessConsumers + "}";
+ }
}
@@ -299,9 +306,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private final IdToConsumerMap _consumers = new IdToConsumerMap();
- //Map<AMQShortString, BasicMessageConsumer> _consumers =
- //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
-
/**
* Contains a list of consumers which have been removed but which might still have
* messages to acknowledge, eg in client ack or transacted modes
@@ -1419,7 +1423,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (message.isDeliverMessage())
{
_highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag());
+
_queue.add(message);
+
}
else
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index efbce6033b..5b1c1aeeee 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -809,9 +809,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/** Acknowledge up to last message delivered (if any). Used when commiting. */
void acknowledgeDelivered()
{
- while (!_receivedDeliveryTags.isEmpty())
+ while (!_receivedDeliveryTags.isEmpty())
{
- _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false);
+ _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false);
}
}
@@ -1017,7 +1017,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size());
rollback();
- }
+ }
clearReceiveQueue();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
index d05e99d210..aa7599f355 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
@@ -46,7 +46,7 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic
{
final AMQProtocolSession session = stateManager.getProtocolSession();
final UnprocessedMessage msg = new UnprocessedMessage.UnprocessedDeliverMessage(body);
- _logger.debug("New JmsDeliver method received");
+ _logger.debug("New JmsDeliver method received");
session.unprocessedMessageReceived(channelId, msg);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 6a5cc62bfc..d19cd7f0d8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -297,15 +297,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
throw new AMQException("Error: received content body without having received a ContentHeader frame first");
}
- /*try
- {*/
+
msg.receiveBody(contentBody);
- /*}
- catch (UnexpectedBodyReceivedException e)
- {
- _channelId2UnprocessedMsgMap.remove(channelId);
- throw e;
- }*/
if (msg.isAllBodyDataReceived())
{
@@ -324,9 +317,13 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
* @param channelId the channel id the message should be delivered to
* @param msg the message
*/
- private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
+ private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg) throws AMQException
{
AMQSession session = getSession(channelId);
+ if(session == null)
+ {
+ throw new AMQException("Error: received message on non-existant channel:" + channelId);
+ }
session.messageReceived(msg);
if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
index fe418535d6..ec45d7e182 100644
--- a/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
@@ -126,7 +126,7 @@ public class AMQQueueDeferredOrderingTest extends TestCase
_logger.info("Consuming messages");
for (int i = 0; i < NUM_MESSAGES; i++)
{
- Message msg = consumer.receive(3000);
+ Message msg = consumer.receive(90000);
assertNotNull("Message should not be null", msg);
assertTrue("Message should be a text message", msg instanceof TextMessage);
assertEquals("Message content does not match expected", Integer.toString(i), ((TextMessage) msg).getText());
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index 20632e245f..6e19f53ffe 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -128,7 +128,7 @@ public class MessageListenerMultiConsumerTest extends TestCase
{
int msg = 0;
int MAX_LOOPS = MSG_COUNT * 2;
- for (int loops = 0; (msg < MSG_COUNT) || (loops < MAX_LOOPS); loops++)
+ for (int loops = 0; (msg < MSG_COUNT) && (loops < MAX_LOOPS); loops++)
{
if (_consumer1.receive(100) != null)
@@ -220,12 +220,18 @@ public class MessageListenerMultiConsumerTest extends TestCase
for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
{
- assertTrue(_consumer1.receive() != null);
+
+ final Message message = _consumer1.receive(100000);
+ if(message == null)
+ {
+ System.out.println("!!!!!!!! " + msg);
+ }
+ assertTrue(message != null);
}
for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
{
- assertTrue(consumer2.receive() != null);
+ assertTrue(consumer2.receive(10000) != null);
}
}
else
@@ -235,12 +241,12 @@ public class MessageListenerMultiConsumerTest extends TestCase
for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
{
- assertTrue(_consumer1.receive() != null);
+ assertTrue(_consumer1.receive(10000) != null);
}
for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
{
- assertTrue(_consumer2.receive() != null);
+ assertTrue(_consumer2.receive(10000) != null);
}
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
index 21f3e273aa..81d9a39dd3 100644
--- a/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
@@ -116,6 +116,7 @@ public class ResetMessageListenerTest extends TestCase
{
_producer.send(_producerSession.createTextMessage("Message " + msg));
}
+// Thread.sleep(120000);
}
@@ -247,6 +248,14 @@ public class ResetMessageListenerTest extends TestCase
{
_producer.send(_producerSession.createTextMessage("Message " + msg));
}
+// try
+// {
+// Thread.sleep(120000);
+// }
+// catch (InterruptedException e)
+// {
+// e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+// }
}
catch (JMSException e)
{
@@ -257,7 +266,7 @@ public class ResetMessageListenerTest extends TestCase
try
{
- _allSecondMessagesSent.await(5000, TimeUnit.MILLISECONDS);
+ _allSecondMessagesSent.await(500000, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index b6f46b4acc..9b34c36ec6 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -115,6 +115,7 @@ public class RecoverTest extends TestCase
consumerSession.recover();
tm = (TextMessage) consumer.receiveNoWait();
+
assertNull(tm);
_logger.info("No messages redelivered as is expected");
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
index cc18169a5b..56247f9634 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
@@ -28,6 +28,9 @@ import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +52,7 @@ public class SessionStartTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
+
init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
index 559e9a4741..3c2e72c07e 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
@@ -69,6 +69,7 @@ public class ChannelCloseOkTest extends TestCase
private static final Logger _log = LoggerFactory.getLogger(ChannelCloseOkTest.class);
public String _connectionString = "vm://:1";
+ private static final int NUM_MESSAGES = 300;
protected void setUp() throws Exception
{
@@ -170,19 +171,18 @@ public class ChannelCloseOkTest extends TestCase
// Ensure both sessions are still ok.
// Send a bunch of messages as this give time for the sessions to be erroneously closed.
- final int num = 300;
- for (int i = 0; i < num; ++i)
+ for (int i = 0; i < NUM_MESSAGES; ++i)
{
send(_session1, _destination1, "" + i);
send(_session2, _destination2, "" + i);
}
- waitFor(_received1, num + 1);
- waitFor(_received2, num + 1);
+ waitFor(_received1, NUM_MESSAGES + 1);
+ waitFor(_received2, NUM_MESSAGES + 1);
// Note that the third message is never received as it is sent to an incorrect destination.
- assertEquals(num + 1, _received1.size());
- assertEquals(num + 1, _received2.size());
+ assertEquals(NUM_MESSAGES + 1, _received1.size());
+ assertEquals(NUM_MESSAGES + 1, _received2.size());
}
private void sendAndWait(Session session, Destination destination, String message, List<Message> received, int count)
@@ -199,15 +199,17 @@ public class ChannelCloseOkTest extends TestCase
producer1.send(session.createTextMessage(message));
}
- private void waitFor(List<Message> received, int count) throws InterruptedException
+ private void waitFor(List<Message> received, final int count) throws InterruptedException
{
+ int lastSeen = -1;
synchronized (received)
{
- while (received.size() < count)
+ while ((lastSeen != received.size()) && (lastSeen = received.size()) < count)
{
+
try
{
- received.wait();
+ received.wait(2000L);
}
catch (InterruptedException e)
{
@@ -216,6 +218,10 @@ public class ChannelCloseOkTest extends TestCase
}
}
}
+ if(received.size() < count)
+ {
+ throw new RuntimeException("Expected: " + count + " got: " + received.size());
+ }
}
private static String randomize(String in)
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
index 19ef612bcc..2ee29e3da4 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
@@ -75,7 +75,9 @@ public class Client implements MessageListener
public synchronized void onMessage(Message response)
{
+
_logger.info("Received " + (++_count) + " of " + _expected + " responses.");
+
if (_count == _expected)
{
@@ -89,10 +91,10 @@ public class Client implements MessageListener
if (_count < _expected)
{
- wait(10000L);
+ wait(1000L);
}
- if (_count < _expected)
+ if (_count != _expected)
{
throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected);
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
index 9cde24dd92..81227b9540 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
@@ -22,9 +22,13 @@ package org.apache.qpid.test.unit.client.forwardall;
import junit.framework.TestCase;
import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Set;
+import java.util.HashSet;
+
/**
* Runs the Service's and Client parts of the test in the same process
* as the broker
@@ -34,6 +38,7 @@ public class CombinedTest extends TestCase
private static final Logger _logger = LoggerFactory.getLogger(CombinedTest.class);
private int run = 0;
+
protected void setUp() throws Exception
{
super.setUp();
@@ -47,16 +52,16 @@ public class CombinedTest extends TestCase
public void testForwardAll() throws Exception
{
- while (run < 10)
+ while (run < 100)
{
int services = 2;
ServiceCreator.start("vm://:1", services);
-
+ Thread.sleep(100);
_logger.info("Starting " + ++run + " client...");
new Client("vm://:1", services).shutdownWhenComplete();
-
+ ServiceCreator.closeAll();
_logger.info("Completed " + run + " successfully!");
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
index 6593f7d86a..bf03ce6899 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
@@ -37,14 +37,16 @@ public class Service implements MessageListener
{
private final AMQConnection _connection;
private final AMQSession _session;
+ private final int _id;
- Service(String broker) throws Exception
+ Service(String broker, int id) throws Exception
{
- this(connect(broker));
+ this(connect(broker), id);
}
- Service(AMQConnection connection) throws Exception
+ Service(AMQConnection connection, int id) throws Exception
{
+ _id = id;
_connection = connection;
AMQQueue queue = new SpecialQueue(connection, "ServiceQueue");
_session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
@@ -56,7 +58,7 @@ public class Service implements MessageListener
{
try
{
- Message response = _session.createTextMessage("Response!");
+ Message response = _session.createTextMessage("Response! " + _id);
Destination replyTo = request.getJMSReplyTo();
_session.createProducer(replyTo).send(response);
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java
index be16f6b7ae..310a0993bc 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java
@@ -34,17 +34,19 @@ public class ServiceCreator implements Runnable
private final String broker;
private Service service;
+ private final int id;
- ServiceCreator(String broker)
+ ServiceCreator(String broker, final int id)
{
this.broker = broker;
+ this.id = id;
}
public void run()
{
try
{
- service = new Service(broker);
+ service = new Service(broker, id);
}
catch (Exception e)
{
@@ -76,11 +78,12 @@ public class ServiceCreator implements Runnable
{
threads = new Thread[services];
_services = new ServiceCreator[services];
- ServiceCreator runner = new ServiceCreator(broker);
+ //ServiceCreator runner = new ServiceCreator(broker);
// start services
_logger.info("Starting " + services + " services...");
for (int i = 0; i < services; i++)
{
+ ServiceCreator runner = new ServiceCreator(broker,i);
threads[i] = new Thread(runner);
_services[i] = runner;
threads[i].start();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index 8d7645c1fd..56904f20de 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -40,30 +40,40 @@ import javax.jms.Queue;
import javax.jms.Session;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Random;
+import java.util.UUID;
public class MessageRequeueTest extends TestCase
{
private static final Logger _logger = LoggerFactory.getLogger(MessageRequeueTest.class);
+
+
protected static AtomicInteger consumerIds = new AtomicInteger(0);
protected final Integer numTestMessages = 150;
protected final int consumeTimeout = 3000;
- protected final String queue = "direct://amq.direct//queue";
+ //protected final String queue = "direct://amq.direct//queue";
protected String payload = "Message:";
protected final String BROKER = "vm://:1";
private boolean testReception = true;
private long[] receieved = new long[numTestMessages + 1];
- private boolean passed = false;
+ //private boolean passed = false;
protected void setUp() throws Exception
{
super.setUp();
TransportConnection.createVMBroker(1);
+
+ }
+
+ private void putMessagesOnQueueThenClose(String queue)
+ throws JMSException, InterruptedException
+ {
QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
@@ -76,20 +86,25 @@ public class MessageRequeueTest extends TestCase
conn.disconnect();
}
- protected void tearDown() throws Exception
+
+
+ private void tearDownQueue(String queue)
+ throws JMSException, InterruptedException
+
{
- super.tearDown();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
- if (!passed) // clean up
- {
- QpidClientConnection conn = new QpidClientConnection(BROKER);
+ conn.connect();
+ // clear queue
+ conn.consume(queue, consumeTimeout);
- conn.connect();
- // clear queue
- conn.consume(queue, consumeTimeout);
+ conn.disconnect();
+ }
- conn.disconnect();
- }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
TransportConnection.killVMBroker(1);
}
@@ -102,6 +117,11 @@ public class MessageRequeueTest extends TestCase
*/
public void testDrain() throws JMSException, InterruptedException
{
+
+ String queue = "direct://amq.direct//queue" + UUID.randomUUID();
+
+ putMessagesOnQueueThenClose(queue);
+
QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
@@ -172,18 +192,22 @@ public class MessageRequeueTest extends TestCase
assertEquals(list.toString(), 0, failed);
_logger.info("consumed: " + messagesReceived);
conn.disconnect();
- passed = true;
+ tearDownQueue(queue);
}
/** multiple consumers
* Based on code subbmitted by client FT-304
*/
- public void testCompetingConsumers()
+ public void testCompetingConsumers() throws JMSException, InterruptedException
{
- Consumer c1 = new Consumer();
- Consumer c2 = new Consumer();
- Consumer c3 = new Consumer();
- Consumer c4 = new Consumer();
+ String queue = "direct://amq.direct//queue" + UUID.randomUUID();
+
+ putMessagesOnQueueThenClose(queue);
+
+ Consumer c1 = new Consumer(queue);
+ Consumer c2 = new Consumer(queue);
+ Consumer c3 = new Consumer(queue);
+ Consumer c4 = new Consumer(queue);
Thread t1 = new Thread(c1);
Thread t2 = new Thread(c2);
@@ -237,16 +261,18 @@ public class MessageRequeueTest extends TestCase
assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
assertTrue("number of consumed messages does not match initial data: " + totalConsumed, numTestMessages <= totalConsumed);
- passed = true;
+ tearDownQueue(queue);
}
class Consumer implements Runnable
{
private Integer count = 0;
private Integer id;
+ private final String _queue;
- public Consumer()
+ public Consumer(String queue)
{
+ _queue = queue;
id = consumerIds.addAndGet(1);
}
@@ -263,7 +289,7 @@ public class MessageRequeueTest extends TestCase
Message result;
do
{
- result = conn.getNextMessage(queue, consumeTimeout);
+ result = conn.getNextMessage(_queue, consumeTimeout);
if (result != null)
{
@@ -322,8 +348,11 @@ public class MessageRequeueTest extends TestCase
}
}
- public void testRequeue() throws JMSException, AMQException, URLSyntaxException
+ public void testRequeue() throws JMSException, AMQException, URLSyntaxException, InterruptedException
{
+ String queue = "direct://amq.direct//queue" + UUID.randomUUID();
+ putMessagesOnQueueThenClose(queue);
+
int run = 0;
// while (run < 10)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 101cba2352..98c0225096 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -88,7 +88,7 @@ public class DurableSubscriptionTest extends TestCase
Message msg;
_logger.info("Receive message on consumer 1:expecting A");
- msg = consumer1.receive();
+ msg = consumer1.receive(1000);
assertEquals("A", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(1000);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 065b06a87d..39730ef3ac 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -20,28 +20,33 @@
*/
package org.apache.qpid.test.unit.topic;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
+import javax.jms.*;
import javax.jms.MessageConsumer;
import javax.jms.Session;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
+import javax.jms.Message;
import junit.framework.TestCase;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.jms.*;
+
+import java.util.UUID;
/** @author Apache Software Foundation */
public class TopicSessionTest extends TestCase
{
private static final String BROKER = "vm://:1";
+ private static final int THREADS = 20;
+ private static final int MESSAGE_COUNT = 10000;
+ private static final int MESSAGE_SIZE = 128;
protected void setUp() throws Exception
{
@@ -102,6 +107,60 @@ public class TopicSessionTest extends TestCase
subscriptionNameReuseForDifferentTopic(true);
}
+ public void notestSilly() throws Exception
+ {
+
+
+ final ExceptionListener listener = new ExceptionListener()
+ {
+ public void onException(JMSException jmsException)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ };
+
+
+ Thread[] threads = new Thread[100];
+
+ for(int j = 0; j < 20; j++)
+ {
+ threads[j] = new Thread(new Runnable() {
+ public void run()
+ {
+ try
+ {
+ AMQConnection con = new AMQConnection("tcp://127.0.0.1:5672?retries='0'", "guest", "guest", "test", "test");
+ AMQTopic topic = new AMQTopic(con, "MyTopic1" + UUID.randomUUID());
+
+
+ TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ con.setExceptionListener(listener);
+
+ TopicPublisher publisher = session1.createPublisher(topic);
+
+ con.start();
+
+ while(true)
+ {
+ publisher.publish(session1.createTextMessage("hello"));
+ Thread.sleep(THREADS);
+ }
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
+ threads[j].run();
+ }
+
+ threads[0].join();
+
+ }
+
+
private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception
{
AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
@@ -368,8 +427,160 @@ public class TopicSessionTest extends TestCase
con2.close();
}
+
+ public void noTestPublishToManyConsumers() throws Exception
+ {
+
+
+ final ExceptionListener exceptionListener = new ExceptionListener()
+ {
+ public void onException(JMSException jmsException)
+ {
+ jmsException.printStackTrace();
+ }
+ };
+
+
+
+ SubscribingThread[] threads = new SubscribingThread[100];
+
+ final String topicName = "MyTopic1" + UUID.randomUUID();
+ for(int j = 0; j < 21; j++)
+ {
+ final int threadId = j;
+ threads[threadId] = new SubscribingThread(threadId, topicName, exceptionListener);
+ threads[j].start();
+ Thread.sleep(100);
+ }
+
+
+ threads[1].join();
+
+ int totalMessages = 0;
+
+ for(int j = 1; j < 21; j++)
+ {
+
+ System.err.println("Thread " + j + ": " + threads[j].msgId);
+ totalMessages += threads[j].msgId;
+ }
+
+ System.err.println("****** Total: " + totalMessages);
+
+
+ }
+
+
+
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(TopicSessionTest.class);
}
+
+ private static class SubscribingThread extends Thread
+ {
+ private final int _threadId;
+ private final String _topicName;
+ private final ExceptionListener _exceptionListener;
+ int msgId = 0;
+
+ public SubscribingThread(final int threadId, final String topicName, final ExceptionListener exceptionListener)
+ {
+ _threadId = threadId;
+ _topicName = topicName;
+ _exceptionListener = exceptionListener;
+ }
+
+ public void run()
+ {
+ try
+ {
+ System.err.println("Thread: " + _threadId);
+
+
+ if(_threadId >0)
+ {
+
+ AMQConnection con2 = new AMQConnection("tcp://127.0.0.1:5672?retries='0'", "guest", "guest", "test", "test");
+ //AMQConnection con2 = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test", "test");
+ AMQTopic topic2 = new AMQTopic(con2, _topicName);
+ TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSubscriber sub = session2.createSubscriber(topic2);
+ con2.setExceptionListener(_exceptionListener);
+
+
+
+ final MessageListener messageListener = new MessageListener()
+ {
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ msgId = message.getIntProperty("MessageId");
+ if(msgId % 1000 == 0)
+ {
+ System.err.println("Thread: " + _threadId + ": " + msgId + "messages");
+ }
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ };
+
+
+ sub.setMessageListener(messageListener);
+ con2.start();
+
+ Thread.sleep(125000);
+
+
+// Thread.sleep(1200000);
+ }
+ else
+ {
+ int messageId = 0;
+
+ AMQConnection con = new AMQConnection("tcp://127.0.0.1:5672?retries='0'", "guest", "guest", "test", "test");
+ //AMQConnection con = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test", "test");
+
+
+ AMQTopic topic = new AMQTopic(con, _topicName);
+
+
+ TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ con.setExceptionListener(_exceptionListener);
+
+ TopicPublisher publisher = session1.createPublisher(topic);
+ publisher.setDisableMessageID(true);
+ publisher.setDisableMessageTimestamp(true);
+ con.start();
+
+ Thread.sleep(5000);
+
+ publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ while(messageId <= 240000)
+ //while(messageId <= 10000)
+ {
+ final TextMessage textMessage = session1.createTextMessage("hello");
+ textMessage.setIntProperty("MessageId", messageId++);
+
+
+ publisher.publish(textMessage);
+
+ }
+ }
+
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
}