diff options
author | Keith Wall <kwall@apache.org> | 2012-05-11 09:55:41 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2012-05-11 09:55:41 +0000 |
commit | bece1e05e7264e85168b0e564623792904d322d6 (patch) | |
tree | 95e69122feff11627adea8c78c1a55e1bcf5fae2 | |
parent | 2097a70e0e26af15de393a1129fd3ebeccd7b96b (diff) | |
download | qpid-python-bece1e05e7264e85168b0e564623792904d322d6.tar.gz |
QPID-3979: [Java Broker] Conflation queues
Fix bug with checking of head() within the loop that would have prevented a queue entry from being conflated.
Refactor code introducing more special Queue Entry to represent the corner cases.
Added systest that exercises a conflation queue with multiple producers/single consumer.
Applied patch by Phil Harvey <phil@philharveyonline.com>, and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1337094 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java | 76 | ||||
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java | 400 |
2 files changed, 335 insertions, 141 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 0b95b9cc47..1e3bb3c50b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -37,6 +37,9 @@ public class ConflationQueueList extends SimpleQueueEntryList private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap = new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>(); + private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this); + private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this); + public ConflationQueueList(AMQQueue queue, String conflationKey) { super(queue); @@ -54,60 +57,85 @@ public class ConflationQueueList extends SimpleQueueEntryList return new ConflationQueueEntry(this, message); } + /** + * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary. + */ @Override public ConflationQueueEntry add(final ServerMessage message) { - final ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message)); + final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message)); final Object keyValue = message.getMessageHeader().getHeader(_conflationKey); if (keyValue != null) { - final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(entry); - AtomicReference<QueueEntry> latestValueReference = null; - QueueEntry oldEntry; + final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry); + AtomicReference<QueueEntry> entryReferenceFromMap = null; + QueueEntry entryFromMap; // Iterate until we have got a valid atomic reference object and either the referent is newer than the current - // entry, or the current entry has replaced it in the reference. Note that the head represents a special value + // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value // indicating that the reference object is no longer valid (it is being removed from the map). + boolean keepTryingToUpdateEntryReference = true; do { - latestValueReference = getOrPutIfAbsent(keyValue, referenceToEntry); - oldEntry = latestValueReference == null ? null : latestValueReference.get(); + do + { + entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry); + + // entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value) + entryFromMap = entryReferenceFromMap.get(); + } + while(entryFromMap == _deleteInProgress); + + boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0; + + keepTryingToUpdateEntryReference = entryFromMapIsOlder + && !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry); } - while(oldEntry != null - && oldEntry.compareTo(entry) < 0 - && oldEntry != getHead() - && !latestValueReference.compareAndSet(oldEntry, entry)); + while(keepTryingToUpdateEntryReference); - if (oldEntry == null) + if (entryFromMap == _newerEntryAlreadyBeenAndGone) { - // Unlikely: A newer entry came along and was consumed (and entry removed from map) - // during our processing of getOrPutIfAbsent(). In this case we know our entry has been superseded. - discardEntry(entry); + discardEntry(addedEntry); } - else if (oldEntry.compareTo(entry) > 0) + else if (entryFromMap.compareTo(addedEntry) > 0) { // A newer entry came along - discardEntry(entry); + discardEntry(addedEntry); } - else if (oldEntry.compareTo(entry) < 0) + else if (entryFromMap.compareTo(addedEntry) < 0) { // We replaced some other entry to become the newest value - discardEntry(oldEntry); + discardEntry(entryFromMap); } - entry.setLatestValueReference(latestValueReference); + addedEntry.setLatestValueReference(entryReferenceFromMap); } - return entry; + return addedEntry; } - private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToValue) + /** + * Returns: + * + * <ul> + * <li>the existing entry reference if the value already exists in the map, or</li> + * <li>referenceToValue if none exists, or</li> + * <li>a reference to {@link #_newerEntryAlreadyBeenAndGone} if another thread concurrently + * adds and removes during execution of this method.</li> + * </ul> + */ + private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue) { - AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToValue); + AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue); + if(latestValueReference == null) { latestValueReference = _latestValuesMap.get(key); + if(latestValueReference == null) + { + return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone); + } } return latestValueReference; } @@ -158,7 +186,7 @@ public class ConflationQueueList extends SimpleQueueEntryList { if(super.delete()) { - if(_latestValueReference != null && _latestValueReference.compareAndSet(this, getHead())) + if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress)) { Object key = getMessageHeader().getHeader(_conflationKey); _latestValuesMap.remove(key,_latestValueReference); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java index 7404f18aa3..0e59e9cceb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java @@ -21,7 +21,9 @@ package org.apache.qpid.server.queue; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; @@ -36,59 +38,65 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; + import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class ConflationQueueTest extends QpidBrokerTestCase { + private static final Logger LOGGER = Logger.getLogger(ConflationQueueTest.class); + + private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg"; + private static final String KEY_PROPERTY = "key"; private static final int MSG_COUNT = 400; - private String queueName; - private Connection producerConnection; - private MessageProducer producer; - private Session producerSession; - private Queue queue; - private Connection consumerConnection; - private Session consumerSession; - private MessageConsumer consumer; + private String _queueName; + private Queue _queue; + private Connection _producerConnection; + private MessageProducer _producer; + private Session _producerSession; + private Connection _consumerConnection; + private Session _consumerSession; + private MessageConsumer _consumer; protected void setUp() throws Exception { super.setUp(); - queueName = getTestQueueName(); - producerConnection = getConnection(); - producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - producerConnection.start(); + _queueName = getTestQueueName(); + _producerConnection = getConnection(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void testConflation() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - createConflationQueue(producerSession); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } - producer.close(); - producerSession.close(); - producerConnection.close(); + _producer.close(); + _producerSession.close(); + _producerConnection.close(); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); Message received; List<Message> messages = new ArrayList<Message>(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -98,33 +106,33 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } } public void testConflationWithRelease() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - createConflationQueue(producerSession); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT/2; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } // HACK to do something synchronous - ((AMQSession<?,?>)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); Message received; List<Message> messages = new ArrayList<Message>(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -134,31 +142,31 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } - consumerSession.close(); - consumerConnection.close(); + _consumerSession.close(); + _consumerConnection.close(); - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } // HACK to do something synchronous - ((AMQSession<?,?>)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); messages = new ArrayList<Message>(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -168,7 +176,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } } @@ -176,26 +184,26 @@ public class ConflationQueueTest extends QpidBrokerTestCase public void testConflationWithReleaseAfterNewPublish() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - createConflationQueue(producerSession); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT/2; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } // HACK to do something synchronous - ((AMQSession<?,?>)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); Message received; List<Message> messages = new ArrayList<Message>(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -205,35 +213,35 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } - consumer.close(); + _consumer.close(); for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } // HACK to do something synchronous - ((AMQSession<?,?>)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); // this causes the "old" messages to be released - consumerSession.close(); - consumerConnection.close(); + _consumerSession.close(); + _consumerConnection.close(); - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); messages = new ArrayList<Message>(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -243,54 +251,54 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } } public void testConflatedQueueDepth() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - createConflationQueue(producerSession); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } - final long queueDepth = ((AMQSession<?, ?>)producerSession).getQueueDepth((AMQDestination)queue, true); + final long queueDepth = ((AMQSession<?, ?>)_producerSession).getQueueDepth((AMQDestination)_queue, true); assertEquals(10, queueDepth); } public void testConflationBrowser() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - createConflationQueue(producerSession); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } - ((AMQSession<?,?>)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); - AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+queueName+"?browse='true'&durable='true'"); + AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'"); AMQQueue browseQueue = new AMQQueue(url); - consumer = consumerSession.createConsumer(browseQueue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(browseQueue); + _consumerConnection.start(); Message received; List<Message> messages = new ArrayList<Message>(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } @@ -300,57 +308,53 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } messages.clear(); - producer.send(nextMessage(MSG_COUNT, producerSession)); + _producer.send(nextMessage(MSG_COUNT, _producerSession)); - ((AMQSession<?,?>)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); - while((received = consumer.receive(1000))!=null) + while((received = _consumer.receive(1000))!=null) { messages.add(received); } assertEquals("Unexpected number of messages received",1,messages.size()); - assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty("msg")); - - - producer.close(); - producerSession.close(); - producerConnection.close(); - + assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + _producer.close(); + _producerSession.close(); + _producerConnection.close(); } - public void testConflation2Browsers() throws Exception { - consumerConnection = getConnection(); - consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - createConflationQueue(producerSession); - producer = producerSession.createProducer(queue); + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT; msg++) { - producer.send(nextMessage(msg, producerSession)); + _producer.send(nextMessage(msg, _producerSession)); } - ((AMQSession<?,?>)producerSession).sync(); + ((AMQSession<?,?>)_producerSession).sync(); - AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+queueName+"?browse='true'&durable='true'"); + AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'"); AMQQueue browseQueue = new AMQQueue(url); - consumer = consumerSession.createConsumer(browseQueue); - MessageConsumer consumer2 = consumerSession.createConsumer(browseQueue); - consumerConnection.start(); + _consumer = _consumerSession.createConsumer(browseQueue); + MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue); + _consumerConnection.start(); List<Message> messages = new ArrayList<Message>(); List<Message> messages2 = new ArrayList<Message>(); - Message received = consumer.receive(1000); + Message received = _consumer.receive(1000); Message received2 = consumer2.receive(1000); while(received!=null || received2!=null) @@ -365,7 +369,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase } - received = consumer.receive(1000); + received = _consumer.receive(1000); received2 = consumer2.receive(1000); } @@ -376,32 +380,194 @@ public class ConflationQueueTest extends QpidBrokerTestCase for(int i = 0 ; i < 10; i++) { Message msg = messages.get(i); - assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); msg = messages2.get(i); - assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg")); + assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + + _producer.close(); + _producerSession.close(); + _producerConnection.close(); + } + + public void testParallelProductionAndConsumption() throws Exception + { + createConflationQueue(_producerSession); + + // Start producing threads that send messages + BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1"); + messageProducer1.startSendingMessages(); + BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2"); + messageProducer2.startSendingMessages(); + + Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1); + + messageProducer1.join(); + messageProducer2.join(); + + final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey(); + assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size()); + final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey(); + assertEquals(lastSentMessages1, lastSentMessages2); + + assertEquals("The last message sent for each key should match the last message received for that key", + lastSentMessages1, lastReceivedMessages); + + assertNull("Unexpected exception from background producer thread", messageProducer1.getException()); + } + + private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer) throws Exception + { + producer.waitUntilQuarterOfMessagesSentToEncourageConflation(); + + _consumerConnection = getConnection(); + int smallPrefetchToEncourageConflation = 1; + _consumerSession = ((AMQConnection)_consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE, smallPrefetchToEncourageConflation); + + LOGGER.info("Starting to receive"); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + Map<String, Integer> messageSequenceNumbersByKey = new HashMap<String, Integer>(); + + Message message; + int numberOfShutdownsReceived = 0; + int numberOfMessagesReceived = 0; + while(numberOfShutdownsReceived < 2) + { + message = _consumer.receive(10000); + assertNotNull(message); + + if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN)) + { + numberOfShutdownsReceived++; + } + else + { + numberOfMessagesReceived++; + putMessageInMap(message, messageSequenceNumbersByKey); + } + } + + LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total"); + + return messageSequenceNumbersByKey; + } + + private void putMessageInMap(Message message, Map<String, Integer> messageSequenceNumbersByKey) throws JMSException + { + String keyValue = message.getStringProperty(KEY_PROPERTY); + Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY); + messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber); + } + + private class BackgroundMessageProducer + { + static final String SHUTDOWN = "SHUTDOWN"; + + private final String _threadName; + + private volatile Exception _exception; + + private Thread _thread; + private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<String, Integer>(); + private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4); + + public BackgroundMessageProducer(String threadName) + { + _threadName = threadName; + } + + public void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException + { + final long latchTimeout = 60000; + boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS); + assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.", success); + LOGGER.info("Quarter of messages sent"); + } + + public Exception getException() + { + return _exception; + } + + public Map<String, Integer> getMessageSequenceNumbersByKey() + { + return Collections.unmodifiableMap(_messageSequenceNumbersByKey); } + public void startSendingMessages() + { + Runnable messageSender = new Runnable() + { + @Override + public void run() + { + try + { + LOGGER.info("Starting to send in background thread"); + Connection producerConnection = getConnection(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer backgroundProducer = producerSession.createProducer(_queue); + for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++) + { + Message message = nextMessage(messageNumber, producerSession, 2); + backgroundProducer.send(message); + + putMessageInMap(message, _messageSequenceNumbersByKey); + _quarterOfMessagesSentLatch.countDown(); + } + + Message shutdownMessage = producerSession.createMessage(); + shutdownMessage.setBooleanProperty(SHUTDOWN, true); + backgroundProducer.send(shutdownMessage); + + LOGGER.info("Finished sending in background thread"); + } + catch (Exception e) + { + _exception = e; + throw new RuntimeException(e); + } + } + }; + + _thread = new Thread(messageSender); + _thread.setName(_threadName); + _thread.start(); + } - producer.close(); - producerSession.close(); - producerConnection.close(); + public void join() throws InterruptedException + { + final int timeoutInMillis = 120000; + _thread.join(timeoutInMillis); + assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive()); + } } private void createConflationQueue(Session session) throws AMQException { final Map<String,Object> arguments = new HashMap<String, Object>(); - arguments.put("qpid.last_value_queue_key","key"); - ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments); - queue = new AMQQueue("amq.direct", queueName); - ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue); + arguments.put("qpid.last_value_queue_key",KEY_PROPERTY); + ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, arguments); + _queue = new AMQQueue("amq.direct", _queueName); + ((AMQSession<?,?>) session).declareAndBind((AMQDestination)_queue); } private Message nextMessage(int msg, Session producerSession) throws JMSException { + return nextMessage(msg, producerSession, 10); + } + + private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException + { Message send = producerSession.createTextMessage("Message: " + msg); - send.setStringProperty("key", String.valueOf(msg % 10)); - send.setIntProperty("msg", msg); + send.setStringProperty(KEY_PROPERTY, String.valueOf(msg % numberOfUniqueKeyValues)); + send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg); return send; } |