diff options
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 8 | ||||
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java | 8 |
2 files changed, 12 insertions, 4 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 7ce81aeea2..41880ee11e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -277,6 +277,14 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me _messageListener.set(messageListener); _session.setHasMessageListeners(); _session.startDispatcherIfNecessary(); + + // If we already have messages on the queue, deliver them to the listener + Object o = _synchronousQueue.poll(); + while (o != null) + { + messageListener.onMessage((Message) o); + o = _synchronousQueue.poll(); + } } } } diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java index 12b84b1495..3b7302df62 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java @@ -132,16 +132,16 @@ public class MessageListenerTest extends QpidTestCase implements MessageListener } - public void testRecieveTheUseMessageListener() throws Exception + public void testRecieveThenUseMessageListener() throws Exception { _logger.error("Test disabled as initial receive is not called first"); // Perform initial receive to start connection - // assertTrue(_consumer.receive(2000) != null); - // receivedCount++; + assertTrue(_consumer.receive(2000) != null); + receivedCount++; // Sleep to ensure remaining 4 msgs end up on _synchronousQueue - // Thread.sleep(1000); + Thread.sleep(1000); // Set the message listener and wait for the messages to come in. _consumer.setMessageListener(this); |