diff options
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java')
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java | 252 |
1 files changed, 176 insertions, 76 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 070d105805..f70250132a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.server.queue; +import org.apache.commons.lang.time.FastDateFormat; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.message.AMQMessage; @@ -39,11 +40,21 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; import javax.management.JMException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.TabularData; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; /** - * Test class to test AMQQueueMBean attribtues and operations + * Test class to test AMQQueueMBean attributes and operations */ public class AMQQueueMBeanTest extends InternalBrokerBaseCase { @@ -139,6 +150,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase verifyBrokerState(); } + // todo: collect to a general testing class -duplicated from Systest/MessageReturntest private void verifyBrokerState() { @@ -219,7 +231,43 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase assertFalse("Exclusive property should be false.", getQueue().isExclusive()); } - public void testExceptions() throws Exception + /** + * Tests view messages with two test messages. The first message is non-persistent, the second persistent + * and has timestamp/expiration. + * + */ + public void testViewMessages() throws Exception + { + sendMessages(1, false); + final Date msg2Timestamp = new Date(); + final Date msg2Expiration = new Date(msg2Timestamp.getTime() + 1000); + sendMessages(1, true, msg2Timestamp.getTime(), msg2Expiration.getTime()); + + final TabularData tab = _queueMBean.viewMessages(1l, 2l); + assertEquals("Unexpected number of rows in table", 2, tab.size()); + final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) tab.values().iterator(); + + // Check row1 + final CompositeDataSupport row1 = rowItr.next(); + assertEquals("Message should have AMQ message id", 1l, row1.get(ManagedQueue.MSG_AMQ_ID)); + assertNotNull("Expected message header array", row1.get(ManagedQueue.MSG_HEADER)); + final Map<String, String> row1Headers = headerArrayToMap((String[])row1.get(ManagedQueue.MSG_HEADER)); + assertEquals("Unexpected JMSPriority within header", "Non_Persistent", row1Headers.get("JMSDeliveryMode")); + assertEquals("Unexpected JMSTimestamp within header", "null", row1Headers.get("JMSTimestamp")); + assertEquals("Unexpected JMSExpiration within header", "null", row1Headers.get("JMSExpiration")); + + final CompositeDataSupport row2 = rowItr.next(); + assertEquals("Message should have AMQ message id", 2l, row2.get(ManagedQueue.MSG_AMQ_ID)); + assertNotNull("Expected message header array", row2.get(ManagedQueue.MSG_HEADER)); + final Map<String, String> row2Headers = headerArrayToMap((String[])row2.get(ManagedQueue.MSG_HEADER)); + assertEquals("Unexpected JMSPriority within header", "Persistent", row2Headers.get("JMSDeliveryMode")); + assertEquals("Unexpected JMSTimestamp within header", FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(msg2Timestamp), + row2Headers.get("JMSTimestamp")); + assertEquals("Unexpected JMSExpiration within header", FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(msg2Expiration), + row2Headers.get("JMSExpiration")); + } + + public void testViewMessageWithIllegalStartEndRanges() throws Exception { try { @@ -228,7 +276,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } try @@ -238,7 +286,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } try @@ -248,7 +296,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } try @@ -260,45 +308,24 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } + } - IncomingMessage msg = message(false, false); - getQueue().clearQueue(); - ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); - qs.add(getQueue()); - msg.enqueue(qs); - MessageMetaData mmd = msg.headersReceived(); - msg.setStoredMessage(getMessageStore().addMessage(mmd)); - long id = msg.getMessageNumber(); - - msg.addContentBodyFrame(new ContentChunk() - { - byte[] _data = new byte[((int)MESSAGE_SIZE)]; - - public int getSize() - { - return (int) MESSAGE_SIZE; - } - - public byte[] getData() - { - return _data; - } + public void testViewMessageContent() throws Exception + { + final List<AMQMessage> sentMessages = sendMessages(1, true); + final Long id = sentMessages.get(0).getMessageId(); - public void reduceToFit() - { + final CompositeData messageData = _queueMBean.viewMessageContent(id); + assertNotNull(messageData); + } - } - }); + public void testViewMessageContentWithUnknownMessageId() throws Exception + { + final List<AMQMessage> sentMessages = sendMessages(1, true); + final Long id = sentMessages.get(0).getMessageId(); - AMQMessage m = new AMQMessage(msg.getStoredMessage()); - for(BaseQueue q : msg.getDestinationQueues()) - { - q.enqueue(m); - } -// _queue.process(_storeContext, new QueueEntry(_queue, msg), false); - _queueMBean.viewMessageContent(id); try { _queueMBean.viewMessageContent(id + 1); @@ -306,7 +333,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase } catch (JMException ex) { - + // PASS } } @@ -364,47 +391,35 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase assertFalse(channel.getBlocking()); } - private IncomingMessage message(final boolean immediate, boolean persistent) throws AMQException + public void testMaximumDeliveryCount() throws IOException { - MessagePublishInfo publish = new MessagePublishInfo() - { + assertEquals("Unexpected default maximum delivery count", Integer.valueOf(0), _queueMBean.getMaximumDeliveryCount()); + } - public AMQShortString getExchange() - { - return null; - } + public void testViewAllMessages() throws Exception + { + final int messageCount = 5; + sendPersistentMessages(messageCount); - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - public boolean isImmediate() - { - return immediate; - } + final TabularData messageTable = _queueMBean.viewMessages(1L, 5L); + assertNotNull("Message table should not be null", messageTable); + assertEquals("Unexpected number of rows", messageCount, messageTable.size()); - public boolean isMandatory() - { - return false; - } - public AMQShortString getRoutingKey() - { - return null; - } - }; - - ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes - contentHeaderBody.setProperties(new BasicContentHeaderProperties()); - ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1)); - IncomingMessage msg = new IncomingMessage(publish); - msg.setContentHeaderBody(contentHeaderBody); - return msg; + final Iterator rowIterator = messageTable.values().iterator(); + // Get its message ID + final CompositeDataSupport row1 = (CompositeDataSupport) rowIterator.next(); + final Long msgId = (Long) row1.get("AMQ MessageId"); + final Long queuePosition = (Long) row1.get("Queue Position"); + final Integer deliveryCount = (Integer) row1.get("Delivery Count"); + assertNotNull("Row should have value for queue position", queuePosition); + assertNotNull("Row should have value for msgid", msgId); + assertNotNull("Row should have value for deliveryCount", deliveryCount); } + @Override public void setUp() throws Exception { @@ -418,11 +433,25 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase ApplicationRegistry.remove(); } - private void sendMessages(int messageCount, boolean persistent) throws AMQException + private void sendPersistentMessages(int messageCount) throws AMQException + { + sendMessages(messageCount, true); + assertEquals("Expected " + messageCount + " messages in the queue", messageCount, _queueMBean + .getMessageCount().intValue()); + } + + private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException + { + return sendMessages(messageCount, persistent, 0l, 0l); + } + + private List<AMQMessage> sendMessages(int messageCount, boolean persistent, long timestamp, long expiration) throws AMQException { + final List<AMQMessage> sentMessages = new ArrayList<AMQMessage>(); + for (int i = 0; i < messageCount; i++) { - IncomingMessage currentMessage = message(false, persistent); + IncomingMessage currentMessage = createIncomingMessage(false, persistent, timestamp, expiration); ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(getQueue()); currentMessage.enqueue(qs); @@ -431,7 +460,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase MessageMetaData mmd = currentMessage.headersReceived(); currentMessage.setStoredMessage(getMessageStore().addMessage(mmd)); - // Add the body so we have somthing to test later + // Add the body so we have something to test later currentMessage.addContentBodyFrame( getSession().getMethodRegistry() .getProtocolVersionMethodConverter() @@ -444,7 +473,78 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase q.enqueue(m); } + sentMessages.add(m); + } + return sentMessages; + } + + private IncomingMessage createIncomingMessage(final boolean immediate, boolean persistent, long timestamp, long expiration) throws AMQException + { + MessagePublishInfo publish = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public void setExchange(AMQShortString exchange) + { + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); + contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes + final BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + contentHeaderBody.setProperties(props); + props.setDeliveryMode((byte) (persistent ? 2 : 1)); + if (timestamp > 0) + { + props.setTimestamp(timestamp); + } + if (expiration > 0) + { + props.setExpiration(expiration); } + IncomingMessage msg = new IncomingMessage(publish); + msg.setContentHeaderBody(contentHeaderBody); + return msg; } + + /** + * + * Utility method to convert array of Strings in the form x = y into a + * map with key/value x => y. + * + */ + private Map<String,String> headerArrayToMap(final String[] headerArray) + { + final Map<String, String> headerMap = new HashMap<String, String>(); + final List<String> headerList = Arrays.asList(headerArray); + for (Iterator<String> iterator = headerList.iterator(); iterator.hasNext();) + { + final String nameValuePair = iterator.next(); + final String[] nameValue = nameValuePair.split(" *= *", 2); + headerMap.put(nameValue[0], nameValue[1]); + } + return headerMap; + } + + } |