From bece1e05e7264e85168b0e564623792904d322d6 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Fri, 11 May 2012 09:55:41 +0000 Subject: QPID-3979: [Java Broker] Conflation queues Fix bug with checking of head() within the loop that would have prevented a queue entry from being conflated. Refactor code introducing more special Queue Entry to represent the corner cases. Added systest that exercises a conflation queue with multiple producers/single consumer. Applied patch by Phil Harvey , and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1337094 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/queue/ConflationQueueList.java | 76 ++-- .../qpid/server/queue/ConflationQueueTest.java | 400 +++++++++++++++------ 2 files changed, 335 insertions(+), 141 deletions(-) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 0b95b9cc47..1e3bb3c50b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -37,6 +37,9 @@ public class ConflationQueueList extends SimpleQueueEntryList private final ConcurrentHashMap> _latestValuesMap = new ConcurrentHashMap>(); + private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this); + private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this); + public ConflationQueueList(AMQQueue queue, String conflationKey) { super(queue); @@ -54,60 +57,85 @@ public class ConflationQueueList extends SimpleQueueEntryList return new ConflationQueueEntry(this, message); } + /** + * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary. + */ @Override public ConflationQueueEntry add(final ServerMessage message) { - final ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message)); + final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message)); final Object keyValue = message.getMessageHeader().getHeader(_conflationKey); if (keyValue != null) { - final AtomicReference referenceToEntry = new AtomicReference(entry); - AtomicReference latestValueReference = null; - QueueEntry oldEntry; + final AtomicReference referenceToEntry = new AtomicReference(addedEntry); + AtomicReference entryReferenceFromMap = null; + QueueEntry entryFromMap; // Iterate until we have got a valid atomic reference object and either the referent is newer than the current - // entry, or the current entry has replaced it in the reference. Note that the head represents a special value + // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value // indicating that the reference object is no longer valid (it is being removed from the map). + boolean keepTryingToUpdateEntryReference = true; do { - latestValueReference = getOrPutIfAbsent(keyValue, referenceToEntry); - oldEntry = latestValueReference == null ? null : latestValueReference.get(); + do + { + entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry); + + // entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value) + entryFromMap = entryReferenceFromMap.get(); + } + while(entryFromMap == _deleteInProgress); + + boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0; + + keepTryingToUpdateEntryReference = entryFromMapIsOlder + && !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry); } - while(oldEntry != null - && oldEntry.compareTo(entry) < 0 - && oldEntry != getHead() - && !latestValueReference.compareAndSet(oldEntry, entry)); + while(keepTryingToUpdateEntryReference); - if (oldEntry == null) + if (entryFromMap == _newerEntryAlreadyBeenAndGone) { - // Unlikely: A newer entry came along and was consumed (and entry removed from map) - // during our processing of getOrPutIfAbsent(). In this case we know our entry has been superseded. - discardEntry(entry); + discardEntry(addedEntry); } - else if (oldEntry.compareTo(entry) > 0) + else if (entryFromMap.compareTo(addedEntry) > 0) { // A newer entry came along - discardEntry(entry); + discardEntry(addedEntry); } - else if (oldEntry.compareTo(entry) < 0) + else if (entryFromMap.compareTo(addedEntry) < 0) { // We replaced some other entry to become the newest value - discardEntry(oldEntry); + discardEntry(entryFromMap); } - entry.setLatestValueReference(latestValueReference); + addedEntry.setLatestValueReference(entryReferenceFromMap); } - return entry; + return addedEntry; } - private AtomicReference getOrPutIfAbsent(final Object key, final AtomicReference referenceToValue) + /** + * Returns: + * + *
    + *
  • the existing entry reference if the value already exists in the map, or
  • + *
  • referenceToValue if none exists, or
  • + *
  • a reference to {@link #_newerEntryAlreadyBeenAndGone} if another thread concurrently + * adds and removes during execution of this method.
  • + *
+ */ + private AtomicReference getOrPutIfAbsent(final Object key, final AtomicReference referenceToAddedValue) { - AtomicReference latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToValue); + AtomicReference latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue); + if(latestValueReference == null) { latestValueReference = _latestValuesMap.get(key); + if(latestValueReference == null) + { + return new AtomicReference(_newerEntryAlreadyBeenAndGone); + } } return latestValueReference; } @@ -158,7 +186,7 @@ public class ConflationQueueList extends SimpleQueueEntryList { if(super.delete()) { - if(_latestValueReference != null && _latestValueReference.compareAndSet(this, getHead())) + if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress)) { Object key = getMessageHeader().getHeader(_conflationKey); _latestValuesMap.remove(key,_latestValueReference); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java index 7404f18aa3..0e59e9cceb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java @@ -21,7 +21,9 @@ package org.apache.qpid.server.queue; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; @@ -36,59 +38,65 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; + import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class ConflationQueueTest extends QpidBrokerTestCase { + private static final Logger LOGGER = Logger.getLogger(ConflationQueueTest.class); + + private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg"; + private static final String KEY_PROPERTY = "key"; private static final int MSG_COUNT = 400; - private String queueName; - private Connection producerConnection; - private MessageProducer producer; - private Session producerSession; - private Queue queue; - private Connection consumerConnection; - private Session consumerSession; - private MessageConsumer consumer; + private String _queueName; + private Queue _queue; + private Connection _producerConnection; + private MessageProducer _producer; + private Session _producerSession; + private Connection _consumerConnection; + private Session _consumerSession; + private MessageConsumer _consumer; protected void setUp() throws Exception { super.setUp(); - queueName = getTestQueueName(); - producerConnection = getConnection(); - producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - producerConnection.start(); + _queueName = getTestQueueName(); + _producerConnection = getConnection(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void testConflation() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - createConflationQueue(producerSession); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } - producer.close(); - producerSession.close(); - producerConnection.close(); + _producer.close(); + _producerSession.close(); + _producerConnection.close(); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); Message received; List messages = new ArrayList(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -98,33 +106,33 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } } public void testConflationWithRelease() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - createConflationQueue(producerSession); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT/2; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } // HACK to do something synchronous - ((AMQSession)producerSession).sync(); + ((AMQSession)_producerSession).sync(); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); Message received; List messages = new ArrayList(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -134,31 +142,31 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } - consumerSession.close(); - consumerConnection.close(); + _consumerSession.close(); + _consumerConnection.close(); - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } // HACK to do something synchronous - ((AMQSession)producerSession).sync(); + ((AMQSession)_producerSession).sync(); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); messages = new ArrayList(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -168,7 +176,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } } @@ -176,26 +184,26 @@ public class ConflationQueueTest extends QpidBrokerTestCase public void testConflationWithReleaseAfterNewPublish() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - createConflationQueue(producerSession); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT/2; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } // HACK to do something synchronous - ((AMQSession)producerSession).sync(); + ((AMQSession)_producerSession).sync(); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); Message received; List messages = new ArrayList(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -205,35 +213,35 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } - consumer.close(); + _consumer.close(); for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } // HACK to do something synchronous - ((AMQSession)producerSession).sync(); + ((AMQSession)_producerSession).sync(); // this causes the "old" messages to be released - consumerSession.close(); - consumerConnection.close(); + _consumerSession.close(); + _consumerConnection.close(); - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); messages = new ArrayList(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -243,54 +251,54 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } } public void testConflatedQueueDepth() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - createConflationQueue(producerSession); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } - final long queueDepth = ((AMQSession)producerSession).getQueueDepth((AMQDestination)queue, true); + final long queueDepth = ((AMQSession)_producerSession).getQueueDepth((AMQDestination)_queue, true); assertEquals(10, queueDepth); } public void testConflationBrowser() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - createConflationQueue(producerSession); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } - ((AMQSession)producerSession).sync(); + ((AMQSession)_producerSession).sync(); - AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+queueName+"?browse='true'&durable='true'"); + AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'"); AMQQueue browseQueue = new AMQQueue(url); - consumer = consumerSession.createConsumer(browseQueue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(browseQueue); + _consumerConnection.start(); Message received; List messages = new ArrayList(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -300,57 +308,53 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } messages.clear(); - producer.send(nextMessage(MSG_COUNT, producerSession)); + _producer.send(nextMessage(MSG_COUNT, _producerSession)); - ((AMQSession)producerSession).sync(); + ((AMQSession)_producerSession).sync(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } assertEquals("Unexpected number of messages received",1,messages.size()); - assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty("msg")); - - - producer.close(); - producerSession.close(); - producerConnection.close(); - + assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + _producer.close(); + _producerSession.close(); + _producerConnection.close(); } - public void testConflation2Browsers() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - createConflationQueue(producerSession); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } - ((AMQSession)producerSession).sync(); + ((AMQSession)_producerSession).sync(); - AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+queueName+"?browse='true'&durable='true'"); + AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'"); AMQQueue browseQueue = new AMQQueue(url); - consumer = consumerSession.createConsumer(browseQueue); - MessageConsumer consumer2 = consumerSession.createConsumer(browseQueue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(browseQueue); + MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue); + _consumerConnection.start(); List messages = new ArrayList(); List messages2 = new ArrayList(); - Message received = consumer.receive(1000); + Message received = _consumer.receive(1000); Message received2 = consumer2.receive(1000); while(received!=null || received2!=null) @@ -365,7 +369,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase } - received = consumer.receive(1000); + received = _consumer.receive(1000); received2 = consumer2.receive(1000); } @@ -376,32 +380,194 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); msg = messages2.get(i); - assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + + _producer.close(); + _producerSession.close(); + _producerConnection.close(); + } + + public void testParallelProductionAndConsumption() throws Exception + { + createConflationQueue(_producerSession); + + // Start producing threads that send messages + BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1"); + messageProducer1.startSendingMessages(); + BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2"); + messageProducer2.startSendingMessages(); + + Map lastReceivedMessages = receiveMessages(messageProducer1); + + messageProducer1.join(); + messageProducer2.join(); + + final Map lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey(); + assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size()); + final Map lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey(); + assertEquals(lastSentMessages1, lastSentMessages2); + + assertEquals("The last message sent for each key should match the last message received for that key", + lastSentMessages1, lastReceivedMessages); + + assertNull("Unexpected exception from background producer thread", messageProducer1.getException()); + } + + private Map receiveMessages(BackgroundMessageProducer producer) throws Exception + { + producer.waitUntilQuarterOfMessagesSentToEncourageConflation(); + + _consumerConnection = getConnection(); + int smallPrefetchToEncourageConflation = 1; + _consumerSession = ((AMQConnection)_consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE, smallPrefetchToEncourageConflation); + + LOGGER.info("Starting to receive"); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + Map messageSequenceNumbersByKey = new HashMap(); + + Message message; + int numberOfShutdownsReceived = 0; + int numberOfMessagesReceived = 0; + while(numberOfShutdownsReceived < 2) + { + message = _consumer.receive(10000); + assertNotNull(message); + + if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN)) + { + numberOfShutdownsReceived++; + } + else + { + numberOfMessagesReceived++; + putMessageInMap(message, messageSequenceNumbersByKey); + } + } + + LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total"); + + return messageSequenceNumbersByKey; + } + + private void putMessageInMap(Message message, Map messageSequenceNumbersByKey) throws JMSException + { + String keyValue = message.getStringProperty(KEY_PROPERTY); + Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY); + messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber); + } + + private class BackgroundMessageProducer + { + static final String SHUTDOWN = "SHUTDOWN"; + + private final String _threadName; + + private volatile Exception _exception; + + private Thread _thread; + private Map _messageSequenceNumbersByKey = new HashMap(); + private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4); + + public BackgroundMessageProducer(String threadName) + { + _threadName = threadName; + } + + public void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException + { + final long latchTimeout = 60000; + boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS); + assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.", success); + LOGGER.info("Quarter of messages sent"); + } + + public Exception getException() + { + return _exception; + } + + public Map getMessageSequenceNumbersByKey() + { + return Collections.unmodifiableMap(_messageSequenceNumbersByKey); } + public void startSendingMessages() + { + Runnable messageSender = new Runnable() + { + @Override + public void run() + { + try + { + LOGGER.info("Starting to send in background thread"); + Connection producerConnection = getConnection(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer backgroundProducer = producerSession.createProducer(_queue); + for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++) + { + Message message = nextMessage(messageNumber, producerSession, 2); + backgroundProducer.send(message); + + putMessageInMap(message, _messageSequenceNumbersByKey); + _quarterOfMessagesSentLatch.countDown(); + } + + Message shutdownMessage = producerSession.createMessage(); + shutdownMessage.setBooleanProperty(SHUTDOWN, true); + backgroundProducer.send(shutdownMessage); + + LOGGER.info("Finished sending in background thread"); + } + catch (Exception e) + { + _exception = e; + throw new RuntimeException(e); + } + } + }; + + _thread = new Thread(messageSender); + _thread.setName(_threadName); + _thread.start(); + } - producer.close(); - producerSession.close(); - producerConnection.close(); + public void join() throws InterruptedException + { + final int timeoutInMillis = 120000; + _thread.join(timeoutInMillis); + assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive()); + } } private void createConflationQueue(Session session) throws AMQException { final Map arguments = new HashMap(); - arguments.put("qpid.last_value_queue_key","key"); - ((AMQSession) session).createQueue(new AMQShortString(queueName), false, true, false, arguments); - queue = new AMQQueue("amq.direct", queueName); - ((AMQSession) session).declareAndBind((AMQDestination)queue); + arguments.put("qpid.last_value_queue_key",KEY_PROPERTY); + ((AMQSession) session).createQueue(new AMQShortString(_queueName), false, true, false, arguments); + _queue = new AMQQueue("amq.direct", _queueName); + ((AMQSession) session).declareAndBind((AMQDestination)_queue); } private Message nextMessage(int msg, Session producerSession) throws JMSException + { + return nextMessage(msg, producerSession, 10); + } + + private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException { Message send = producerSession.createTextMessage("Message: " + msg); - send.setStringProperty("key", String.valueOf(msg % 10)); - send.setIntProperty("msg", msg); + send.setStringProperty(KEY_PROPERTY, String.valueOf(msg % numberOfUniqueKeyValues)); + send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg); return send; } -- cgit v1.2.1