summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-05-11 09:55:41 +0000
committerKeith Wall <kwall@apache.org>2012-05-11 09:55:41 +0000
commitbece1e05e7264e85168b0e564623792904d322d6 (patch)
tree95e69122feff11627adea8c78c1a55e1bcf5fae2
parent2097a70e0e26af15de393a1129fd3ebeccd7b96b (diff)
downloadqpid-python-bece1e05e7264e85168b0e564623792904d322d6.tar.gz
QPID-3979: [Java Broker] Conflation queues
Fix bug with checking of head() within the loop that would have prevented a queue entry from being conflated. Refactor code introducing more special Queue Entry to represent the corner cases. Added systest that exercises a conflation queue with multiple producers/single consumer. Applied patch by Phil Harvey <phil@philharveyonline.com>, and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1337094 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java76
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java400
2 files changed, 335 insertions, 141 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 0b95b9cc47..1e3bb3c50b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -37,6 +37,9 @@ public class ConflationQueueList extends SimpleQueueEntryList
private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap =
new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>();
+ private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this);
+ private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this);
+
public ConflationQueueList(AMQQueue queue, String conflationKey)
{
super(queue);
@@ -54,60 +57,85 @@ public class ConflationQueueList extends SimpleQueueEntryList
return new ConflationQueueEntry(this, message);
}
+ /**
+ * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary.
+ */
@Override
public ConflationQueueEntry add(final ServerMessage message)
{
- final ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message));
+ final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message));
final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
if (keyValue != null)
{
- final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(entry);
- AtomicReference<QueueEntry> latestValueReference = null;
- QueueEntry oldEntry;
+ final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry);
+ AtomicReference<QueueEntry> entryReferenceFromMap = null;
+ QueueEntry entryFromMap;
// Iterate until we have got a valid atomic reference object and either the referent is newer than the current
- // entry, or the current entry has replaced it in the reference. Note that the head represents a special value
+ // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value
// indicating that the reference object is no longer valid (it is being removed from the map).
+ boolean keepTryingToUpdateEntryReference = true;
do
{
- latestValueReference = getOrPutIfAbsent(keyValue, referenceToEntry);
- oldEntry = latestValueReference == null ? null : latestValueReference.get();
+ do
+ {
+ entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry);
+
+ // entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value)
+ entryFromMap = entryReferenceFromMap.get();
+ }
+ while(entryFromMap == _deleteInProgress);
+
+ boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0;
+
+ keepTryingToUpdateEntryReference = entryFromMapIsOlder
+ && !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry);
}
- while(oldEntry != null
- && oldEntry.compareTo(entry) < 0
- && oldEntry != getHead()
- && !latestValueReference.compareAndSet(oldEntry, entry));
+ while(keepTryingToUpdateEntryReference);
- if (oldEntry == null)
+ if (entryFromMap == _newerEntryAlreadyBeenAndGone)
{
- // Unlikely: A newer entry came along and was consumed (and entry removed from map)
- // during our processing of getOrPutIfAbsent(). In this case we know our entry has been superseded.
- discardEntry(entry);
+ discardEntry(addedEntry);
}
- else if (oldEntry.compareTo(entry) > 0)
+ else if (entryFromMap.compareTo(addedEntry) > 0)
{
// A newer entry came along
- discardEntry(entry);
+ discardEntry(addedEntry);
}
- else if (oldEntry.compareTo(entry) < 0)
+ else if (entryFromMap.compareTo(addedEntry) < 0)
{
// We replaced some other entry to become the newest value
- discardEntry(oldEntry);
+ discardEntry(entryFromMap);
}
- entry.setLatestValueReference(latestValueReference);
+ addedEntry.setLatestValueReference(entryReferenceFromMap);
}
- return entry;
+ return addedEntry;
}
- private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToValue)
+ /**
+ * Returns:
+ *
+ * <ul>
+ * <li>the existing entry reference if the value already exists in the map, or</li>
+ * <li>referenceToValue if none exists, or</li>
+ * <li>a reference to {@link #_newerEntryAlreadyBeenAndGone} if another thread concurrently
+ * adds and removes during execution of this method.</li>
+ * </ul>
+ */
+ private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue)
{
- AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToValue);
+ AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);
+
if(latestValueReference == null)
{
latestValueReference = _latestValuesMap.get(key);
+ if(latestValueReference == null)
+ {
+ return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone);
+ }
}
return latestValueReference;
}
@@ -158,7 +186,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
{
if(super.delete())
{
- if(_latestValueReference != null && _latestValueReference.compareAndSet(this, getHead()))
+ if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress))
{
Object key = getMessageHeader().getHeader(_conflationKey);
_latestValuesMap.remove(key,_latestValueReference);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
index 7404f18aa3..0e59e9cceb 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
@@ -21,7 +21,9 @@
package org.apache.qpid.server.queue;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
@@ -36,59 +38,65 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
public class ConflationQueueTest extends QpidBrokerTestCase
{
+ private static final Logger LOGGER = Logger.getLogger(ConflationQueueTest.class);
+
+ private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg";
+ private static final String KEY_PROPERTY = "key";
private static final int MSG_COUNT = 400;
- private String queueName;
- private Connection producerConnection;
- private MessageProducer producer;
- private Session producerSession;
- private Queue queue;
- private Connection consumerConnection;
- private Session consumerSession;
- private MessageConsumer consumer;
+ private String _queueName;
+ private Queue _queue;
+ private Connection _producerConnection;
+ private MessageProducer _producer;
+ private Session _producerSession;
+ private Connection _consumerConnection;
+ private Session _consumerSession;
+ private MessageConsumer _consumer;
protected void setUp() throws Exception
{
super.setUp();
- queueName = getTestQueueName();
- producerConnection = getConnection();
- producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- producerConnection.start();
+ _queueName = getTestQueueName();
+ _producerConnection = getConnection();
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void testConflation() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- createConflationQueue(producerSession);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
- producer.close();
- producerSession.close();
- producerConnection.close();
+ _producer.close();
+ _producerSession.close();
+ _producerConnection.close();
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
Message received;
List<Message> messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -98,33 +106,33 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
}
public void testConflationWithRelease() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- createConflationQueue(producerSession);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT/2; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
// HACK to do something synchronous
- ((AMQSession<?,?>)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
Message received;
List<Message> messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -134,31 +142,31 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
- consumerSession.close();
- consumerConnection.close();
+ _consumerSession.close();
+ _consumerConnection.close();
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
// HACK to do something synchronous
- ((AMQSession<?,?>)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -168,7 +176,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
}
@@ -176,26 +184,26 @@ public class ConflationQueueTest extends QpidBrokerTestCase
public void testConflationWithReleaseAfterNewPublish() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- createConflationQueue(producerSession);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT/2; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
// HACK to do something synchronous
- ((AMQSession<?,?>)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
Message received;
List<Message> messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -205,35 +213,35 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
- consumer.close();
+ _consumer.close();
for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
// HACK to do something synchronous
- ((AMQSession<?,?>)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
// this causes the "old" messages to be released
- consumerSession.close();
- consumerConnection.close();
+ _consumerSession.close();
+ _consumerConnection.close();
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -243,54 +251,54 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
}
public void testConflatedQueueDepth() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- createConflationQueue(producerSession);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
- final long queueDepth = ((AMQSession<?, ?>)producerSession).getQueueDepth((AMQDestination)queue, true);
+ final long queueDepth = ((AMQSession<?, ?>)_producerSession).getQueueDepth((AMQDestination)_queue, true);
assertEquals(10, queueDepth);
}
public void testConflationBrowser() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- createConflationQueue(producerSession);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
- ((AMQSession<?,?>)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+queueName+"?browse='true'&durable='true'");
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'");
AMQQueue browseQueue = new AMQQueue(url);
- consumer = consumerSession.createConsumer(browseQueue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(browseQueue);
+ _consumerConnection.start();
Message received;
List<Message> messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -300,57 +308,53 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
messages.clear();
- producer.send(nextMessage(MSG_COUNT, producerSession));
+ _producer.send(nextMessage(MSG_COUNT, _producerSession));
- ((AMQSession<?,?>)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
assertEquals("Unexpected number of messages received",1,messages.size());
- assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty("msg"));
-
-
- producer.close();
- producerSession.close();
- producerConnection.close();
-
+ assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ _producer.close();
+ _producerSession.close();
+ _producerConnection.close();
}
-
public void testConflation2Browsers() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- createConflationQueue(producerSession);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
- ((AMQSession<?,?>)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+queueName+"?browse='true'&durable='true'");
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'");
AMQQueue browseQueue = new AMQQueue(url);
- consumer = consumerSession.createConsumer(browseQueue);
- MessageConsumer consumer2 = consumerSession.createConsumer(browseQueue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(browseQueue);
+ MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue);
+ _consumerConnection.start();
List<Message> messages = new ArrayList<Message>();
List<Message> messages2 = new ArrayList<Message>();
- Message received = consumer.receive(1000);
+ Message received = _consumer.receive(1000);
Message received2 = consumer2.receive(1000);
while(received!=null || received2!=null)
@@ -365,7 +369,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase
}
- received = consumer.receive(1000);
+ received = _consumer.receive(1000);
received2 = consumer2.receive(1000);
}
@@ -376,32 +380,194 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
msg = messages2.get(i);
- assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+
+
+ _producer.close();
+ _producerSession.close();
+ _producerConnection.close();
+ }
+
+ public void testParallelProductionAndConsumption() throws Exception
+ {
+ createConflationQueue(_producerSession);
+
+ // Start producing threads that send messages
+ BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1");
+ messageProducer1.startSendingMessages();
+ BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2");
+ messageProducer2.startSendingMessages();
+
+ Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1);
+
+ messageProducer1.join();
+ messageProducer2.join();
+
+ final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey();
+ assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size());
+ final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey();
+ assertEquals(lastSentMessages1, lastSentMessages2);
+
+ assertEquals("The last message sent for each key should match the last message received for that key",
+ lastSentMessages1, lastReceivedMessages);
+
+ assertNull("Unexpected exception from background producer thread", messageProducer1.getException());
+ }
+
+ private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer) throws Exception
+ {
+ producer.waitUntilQuarterOfMessagesSentToEncourageConflation();
+
+ _consumerConnection = getConnection();
+ int smallPrefetchToEncourageConflation = 1;
+ _consumerSession = ((AMQConnection)_consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE, smallPrefetchToEncourageConflation);
+
+ LOGGER.info("Starting to receive");
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+ Map<String, Integer> messageSequenceNumbersByKey = new HashMap<String, Integer>();
+
+ Message message;
+ int numberOfShutdownsReceived = 0;
+ int numberOfMessagesReceived = 0;
+ while(numberOfShutdownsReceived < 2)
+ {
+ message = _consumer.receive(10000);
+ assertNotNull(message);
+
+ if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN))
+ {
+ numberOfShutdownsReceived++;
+ }
+ else
+ {
+ numberOfMessagesReceived++;
+ putMessageInMap(message, messageSequenceNumbersByKey);
+ }
+ }
+
+ LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total");
+
+ return messageSequenceNumbersByKey;
+ }
+
+ private void putMessageInMap(Message message, Map<String, Integer> messageSequenceNumbersByKey) throws JMSException
+ {
+ String keyValue = message.getStringProperty(KEY_PROPERTY);
+ Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY);
+ messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber);
+ }
+
+ private class BackgroundMessageProducer
+ {
+ static final String SHUTDOWN = "SHUTDOWN";
+
+ private final String _threadName;
+
+ private volatile Exception _exception;
+
+ private Thread _thread;
+ private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<String, Integer>();
+ private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4);
+
+ public BackgroundMessageProducer(String threadName)
+ {
+ _threadName = threadName;
+ }
+
+ public void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException
+ {
+ final long latchTimeout = 60000;
+ boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS);
+ assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.", success);
+ LOGGER.info("Quarter of messages sent");
+ }
+
+ public Exception getException()
+ {
+ return _exception;
+ }
+
+ public Map<String, Integer> getMessageSequenceNumbersByKey()
+ {
+ return Collections.unmodifiableMap(_messageSequenceNumbersByKey);
}
+ public void startSendingMessages()
+ {
+ Runnable messageSender = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ LOGGER.info("Starting to send in background thread");
+ Connection producerConnection = getConnection();
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer backgroundProducer = producerSession.createProducer(_queue);
+ for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++)
+ {
+ Message message = nextMessage(messageNumber, producerSession, 2);
+ backgroundProducer.send(message);
+
+ putMessageInMap(message, _messageSequenceNumbersByKey);
+ _quarterOfMessagesSentLatch.countDown();
+ }
+
+ Message shutdownMessage = producerSession.createMessage();
+ shutdownMessage.setBooleanProperty(SHUTDOWN, true);
+ backgroundProducer.send(shutdownMessage);
+
+ LOGGER.info("Finished sending in background thread");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ _thread = new Thread(messageSender);
+ _thread.setName(_threadName);
+ _thread.start();
+ }
- producer.close();
- producerSession.close();
- producerConnection.close();
+ public void join() throws InterruptedException
+ {
+ final int timeoutInMillis = 120000;
+ _thread.join(timeoutInMillis);
+ assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive());
+ }
}
private void createConflationQueue(Session session) throws AMQException
{
final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments);
- queue = new AMQQueue("amq.direct", queueName);
- ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
+ arguments.put("qpid.last_value_queue_key",KEY_PROPERTY);
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, arguments);
+ _queue = new AMQQueue("amq.direct", _queueName);
+ ((AMQSession<?,?>) session).declareAndBind((AMQDestination)_queue);
}
private Message nextMessage(int msg, Session producerSession) throws JMSException
{
+ return nextMessage(msg, producerSession, 10);
+ }
+
+ private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException
+ {
Message send = producerSession.createTextMessage("Message: " + msg);
- send.setStringProperty("key", String.valueOf(msg % 10));
- send.setIntProperty("msg", msg);
+ send.setStringProperty(KEY_PROPERTY, String.valueOf(msg % numberOfUniqueKeyValues));
+ send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg);
return send;
}