summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java252
1 files changed, 176 insertions, 76 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 070d105805..f70250132a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.commons.lang.time.FastDateFormat;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.message.AMQMessage;
@@ -39,11 +40,21 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import javax.management.JMException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.TabularData;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
/**
- * Test class to test AMQQueueMBean attribtues and operations
+ * Test class to test AMQQueueMBean attributes and operations
*/
public class AMQQueueMBeanTest extends InternalBrokerBaseCase
{
@@ -139,6 +150,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
verifyBrokerState();
}
+
// todo: collect to a general testing class -duplicated from Systest/MessageReturntest
private void verifyBrokerState()
{
@@ -219,7 +231,43 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
assertFalse("Exclusive property should be false.", getQueue().isExclusive());
}
- public void testExceptions() throws Exception
+ /**
+ * Tests view messages with two test messages. The first message is non-persistent, the second persistent
+ * and has timestamp/expiration.
+ *
+ */
+ public void testViewMessages() throws Exception
+ {
+ sendMessages(1, false);
+ final Date msg2Timestamp = new Date();
+ final Date msg2Expiration = new Date(msg2Timestamp.getTime() + 1000);
+ sendMessages(1, true, msg2Timestamp.getTime(), msg2Expiration.getTime());
+
+ final TabularData tab = _queueMBean.viewMessages(1l, 2l);
+ assertEquals("Unexpected number of rows in table", 2, tab.size());
+ final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) tab.values().iterator();
+
+ // Check row1
+ final CompositeDataSupport row1 = rowItr.next();
+ assertEquals("Message should have AMQ message id", 1l, row1.get(ManagedQueue.MSG_AMQ_ID));
+ assertNotNull("Expected message header array", row1.get(ManagedQueue.MSG_HEADER));
+ final Map<String, String> row1Headers = headerArrayToMap((String[])row1.get(ManagedQueue.MSG_HEADER));
+ assertEquals("Unexpected JMSPriority within header", "Non_Persistent", row1Headers.get("JMSDeliveryMode"));
+ assertEquals("Unexpected JMSTimestamp within header", "null", row1Headers.get("JMSTimestamp"));
+ assertEquals("Unexpected JMSExpiration within header", "null", row1Headers.get("JMSExpiration"));
+
+ final CompositeDataSupport row2 = rowItr.next();
+ assertEquals("Message should have AMQ message id", 2l, row2.get(ManagedQueue.MSG_AMQ_ID));
+ assertNotNull("Expected message header array", row2.get(ManagedQueue.MSG_HEADER));
+ final Map<String, String> row2Headers = headerArrayToMap((String[])row2.get(ManagedQueue.MSG_HEADER));
+ assertEquals("Unexpected JMSPriority within header", "Persistent", row2Headers.get("JMSDeliveryMode"));
+ assertEquals("Unexpected JMSTimestamp within header", FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(msg2Timestamp),
+ row2Headers.get("JMSTimestamp"));
+ assertEquals("Unexpected JMSExpiration within header", FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(msg2Expiration),
+ row2Headers.get("JMSExpiration"));
+ }
+
+ public void testViewMessageWithIllegalStartEndRanges() throws Exception
{
try
{
@@ -228,7 +276,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
}
catch (JMException ex)
{
-
+ // PASS
}
try
@@ -238,7 +286,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
}
catch (JMException ex)
{
-
+ // PASS
}
try
@@ -248,7 +296,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
}
catch (JMException ex)
{
-
+ // PASS
}
try
@@ -260,45 +308,24 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
}
catch (JMException ex)
{
-
+ // PASS
}
+ }
- IncomingMessage msg = message(false, false);
- getQueue().clearQueue();
- ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
- qs.add(getQueue());
- msg.enqueue(qs);
- MessageMetaData mmd = msg.headersReceived();
- msg.setStoredMessage(getMessageStore().addMessage(mmd));
- long id = msg.getMessageNumber();
-
- msg.addContentBodyFrame(new ContentChunk()
- {
- byte[] _data = new byte[((int)MESSAGE_SIZE)];
-
- public int getSize()
- {
- return (int) MESSAGE_SIZE;
- }
-
- public byte[] getData()
- {
- return _data;
- }
+ public void testViewMessageContent() throws Exception
+ {
+ final List<AMQMessage> sentMessages = sendMessages(1, true);
+ final Long id = sentMessages.get(0).getMessageId();
- public void reduceToFit()
- {
+ final CompositeData messageData = _queueMBean.viewMessageContent(id);
+ assertNotNull(messageData);
+ }
- }
- });
+ public void testViewMessageContentWithUnknownMessageId() throws Exception
+ {
+ final List<AMQMessage> sentMessages = sendMessages(1, true);
+ final Long id = sentMessages.get(0).getMessageId();
- AMQMessage m = new AMQMessage(msg.getStoredMessage());
- for(BaseQueue q : msg.getDestinationQueues())
- {
- q.enqueue(m);
- }
-// _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
- _queueMBean.viewMessageContent(id);
try
{
_queueMBean.viewMessageContent(id + 1);
@@ -306,7 +333,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
}
catch (JMException ex)
{
-
+ // PASS
}
}
@@ -364,47 +391,35 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
assertFalse(channel.getBlocking());
}
- private IncomingMessage message(final boolean immediate, boolean persistent) throws AMQException
+ public void testMaximumDeliveryCount() throws IOException
{
- MessagePublishInfo publish = new MessagePublishInfo()
- {
+ assertEquals("Unexpected default maximum delivery count", Integer.valueOf(0), _queueMBean.getMaximumDeliveryCount());
+ }
- public AMQShortString getExchange()
- {
- return null;
- }
+ public void testViewAllMessages() throws Exception
+ {
+ final int messageCount = 5;
+ sendPersistentMessages(messageCount);
- public void setExchange(AMQShortString exchange)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- public boolean isImmediate()
- {
- return immediate;
- }
+ final TabularData messageTable = _queueMBean.viewMessages(1L, 5L);
+ assertNotNull("Message table should not be null", messageTable);
+ assertEquals("Unexpected number of rows", messageCount, messageTable.size());
- public boolean isMandatory()
- {
- return false;
- }
- public AMQShortString getRoutingKey()
- {
- return null;
- }
- };
-
- ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
- contentHeaderBody.setProperties(new BasicContentHeaderProperties());
- ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1));
- IncomingMessage msg = new IncomingMessage(publish);
- msg.setContentHeaderBody(contentHeaderBody);
- return msg;
+ final Iterator rowIterator = messageTable.values().iterator();
+ // Get its message ID
+ final CompositeDataSupport row1 = (CompositeDataSupport) rowIterator.next();
+ final Long msgId = (Long) row1.get("AMQ MessageId");
+ final Long queuePosition = (Long) row1.get("Queue Position");
+ final Integer deliveryCount = (Integer) row1.get("Delivery Count");
+ assertNotNull("Row should have value for queue position", queuePosition);
+ assertNotNull("Row should have value for msgid", msgId);
+ assertNotNull("Row should have value for deliveryCount", deliveryCount);
}
+
@Override
public void setUp() throws Exception
{
@@ -418,11 +433,25 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
ApplicationRegistry.remove();
}
- private void sendMessages(int messageCount, boolean persistent) throws AMQException
+ private void sendPersistentMessages(int messageCount) throws AMQException
+ {
+ sendMessages(messageCount, true);
+ assertEquals("Expected " + messageCount + " messages in the queue", messageCount, _queueMBean
+ .getMessageCount().intValue());
+ }
+
+ private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException
+ {
+ return sendMessages(messageCount, persistent, 0l, 0l);
+ }
+
+ private List<AMQMessage> sendMessages(int messageCount, boolean persistent, long timestamp, long expiration) throws AMQException
{
+ final List<AMQMessage> sentMessages = new ArrayList<AMQMessage>();
+
for (int i = 0; i < messageCount; i++)
{
- IncomingMessage currentMessage = message(false, persistent);
+ IncomingMessage currentMessage = createIncomingMessage(false, persistent, timestamp, expiration);
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(getQueue());
currentMessage.enqueue(qs);
@@ -431,7 +460,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
MessageMetaData mmd = currentMessage.headersReceived();
currentMessage.setStoredMessage(getMessageStore().addMessage(mmd));
- // Add the body so we have somthing to test later
+ // Add the body so we have something to test later
currentMessage.addContentBodyFrame(
getSession().getMethodRegistry()
.getProtocolVersionMethodConverter()
@@ -444,7 +473,78 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
q.enqueue(m);
}
+ sentMessages.add(m);
+ }
+ return sentMessages;
+ }
+
+ private IncomingMessage createIncomingMessage(final boolean immediate, boolean persistent, long timestamp, long expiration) throws AMQException
+ {
+ MessagePublishInfo publish = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
+ final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ contentHeaderBody.setProperties(props);
+ props.setDeliveryMode((byte) (persistent ? 2 : 1));
+ if (timestamp > 0)
+ {
+ props.setTimestamp(timestamp);
+ }
+ if (expiration > 0)
+ {
+ props.setExpiration(expiration);
}
+ IncomingMessage msg = new IncomingMessage(publish);
+ msg.setContentHeaderBody(contentHeaderBody);
+ return msg;
}
+
+ /**
+ *
+ * Utility method to convert array of Strings in the form x = y into a
+ * map with key/value x =&gt; y.
+ *
+ */
+ private Map<String,String> headerArrayToMap(final String[] headerArray)
+ {
+ final Map<String, String> headerMap = new HashMap<String, String>();
+ final List<String> headerList = Arrays.asList(headerArray);
+ for (Iterator<String> iterator = headerList.iterator(); iterator.hasNext();)
+ {
+ final String nameValuePair = iterator.next();
+ final String[] nameValue = nameValuePair.split(" *= *", 2);
+ headerMap.put(nameValue[0], nameValue[1]);
+ }
+ return headerMap;
+ }
+
+
}