From 81de16dca5f5b035a25ad87e9e3fbb4c45093e2f Mon Sep 17 00:00:00 2001 From: Alex Rudyy Date: Wed, 5 Jun 2013 13:48:44 +0000 Subject: QPID-4908: Expose queue attributes MessageGroupKey and MessageGroupSharedGroups via queue MBean git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1489872 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/jmx/mbeans/QueueMBean.java | 13 ++++++++ .../qpid/server/jmx/mbeans/QueueMBeanTest.java | 12 ++++++++ .../management/common/mbeans/ManagedQueue.java | 35 +++++++++++++++++----- .../common/mbeans/ServerInformation.java | 2 +- .../management/jmx/QueueManagementTest.java | 35 ++++++++++++++++++++++ 5 files changed, 88 insertions(+), 9 deletions(-) diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index 94fac218ff..cda32cc3f8 100644 --- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -662,4 +662,17 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN final Number statistic = (Number) _queue.getStatistics().getStatistic(name); return statistic == null ? Integer.valueOf(0) : statistic; } + + @Override + public String getMessageGroupKey() + { + return (String) _queue.getAttribute(Queue.MESSAGE_GROUP_KEY); + } + + @Override + public boolean isMessageGroupSharedGroups() + { + Boolean value = (Boolean) _queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS); + return value == null ? false : value.booleanValue(); + } } diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java index f2663bca4e..3711a90f3b 100644 --- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java +++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java @@ -436,4 +436,16 @@ public class QueueMBeanTest extends QpidTestCase byte[] data = (byte[]) comp.get(ManagedQueue.CONTENT); assertEquals(messageSize, data.length); } + + public void testGetMessageGroupKey() + { + when(_mockQueue.getAttribute(Queue.MESSAGE_GROUP_KEY)).thenReturn(getTestName()); + assertEquals("Unexpected message group key", getTestName(), _queueMBean.getMessageGroupKey()); + } + + public void testIsSharedMessageGroup() + { + when(_mockQueue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS)).thenReturn(true); + assertEquals("Unexpected message group sharing", true, _queueMBean.isMessageGroupSharedGroups()); + } } diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java index b00b28b2a9..e6f24c2c73 100644 --- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java +++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java @@ -89,6 +89,8 @@ public interface ManagedQueue static final String ATTR_FLOW_RESUME_CAPACITY = "FlowResumeCapacity"; static final String ATTR_EXCLUSIVE = "Exclusive"; static final String ATTR_ALT_EXCHANGE = "AlternateExchange"; + static final String ATTR_SHARED_MESSAGE_GROUP = "MessageGroupSharedGroups"; + static final String ATTR_MESSAGE_GROUP_KEY = "MessageGroupKey"; //All attribute names constant static final List QUEUE_ATTRIBUTES @@ -116,7 +118,9 @@ public interface ManagedQueue ATTR_FLOW_OVERFULL, ATTR_FLOW_RESUME_CAPACITY, ATTR_EXCLUSIVE, - ATTR_ALT_EXCHANGE + ATTR_ALT_EXCHANGE, + ATTR_SHARED_MESSAGE_GROUP, + ATTR_MESSAGE_GROUP_KEY )))); /** @@ -308,13 +312,6 @@ public interface ManagedQueue @MBeanAttribute(name="Description", description="Free text description of the queue") void setDescription(String string); - /** - * Gets the queue type - * @since Qpid JMX API 2.5 - */ - @MBeanAttribute(name="QueueType", description="Type of the queue e.g. standard, priority, etc") - String getQueueType(); - /** * Returns the current flow control FlowResumeCapacity of the queue in bytes. * @@ -385,6 +382,28 @@ public interface ManagedQueue */ String getAlternateExchange() throws IOException; + + /** + * Gets the queue type + * @since Qpid JMX API 2.5 + */ + @MBeanAttribute(name="QueueType", description="Type of the queue e.g. standard, priority, etc") + String getQueueType(); + + /** + * Gets the message group key + * @since Qpid JMX API 2.7 + */ + @MBeanAttribute(name="MessageGroupKey", description="Message header name to hold message group value") + String getMessageGroupKey(); + + /** + * Gets the message group key + * @since Qpid JMX API 2.7 + */ + @MBeanAttribute(name="MessageGroupSharedGroups", description="Indicates C++ compatibility mode, the Broker enforces a looser guarantee, nameley that all the currently unacknowledged messages in a group will be sent to the same consumer. This means that only one consumer can be processing messages from a particular group at a given time. When the consumer acknowledges all of its acquired messages, then the broker may pass the next pending message from that group to a different consumer") + boolean isMessageGroupSharedGroups(); + //********** Operations *****************// diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java index 43249ea004..b1519a27b6 100644 --- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java +++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java @@ -46,7 +46,7 @@ public interface ServerInformation * Qpid JMX API 1.1 can be assumed. */ int QPID_JMX_API_MAJOR_VERSION = 2; - int QPID_JMX_API_MINOR_VERSION = 6; + int QPID_JMX_API_MINOR_VERSION = 7; /** diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index 448886d056..cf066e3c01 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -28,6 +28,7 @@ import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.NotificationCheckTest; +import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.queue.SimpleAMQQueueTest; import org.apache.qpid.test.client.destination.AddressBasedDestinationTest; import org.apache.qpid.test.utils.JMXTestUtils; @@ -635,6 +636,40 @@ public class QueueManagementTest extends QpidBrokerTestCase assertMessageIndicesOn(_sourceQueue, 1,2,3,4,5,6,13); } + public void testGetMessageGroupKey() throws Exception + { + final String queueName = getName(); + final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + + final Object messageGroupKey = "test"; + final Map arguments = Collections.singletonMap(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, messageGroupKey); + managedBroker.createNewQueue(queueName, null, true, arguments); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + + assertNotNull("Manager queue expected to be available", managedQueue); + assertEquals("Unexpected message group key", messageGroupKey, managedQueue.getMessageGroupKey()); + assertEquals("Unexpected message group sharing", false, managedQueue.isMessageGroupSharedGroups()); + } + + public void testIsMessageGroupSharedGroups() throws Exception + { + final String queueName = getName(); + final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + + final Object messageGroupKey = "test"; + final Map arguments = new HashMap(2); + arguments.put(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, messageGroupKey); + arguments.put(SimpleAMQQueue.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE); + managedBroker.createNewQueue(queueName, null, true, arguments); + + final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + + assertNotNull("Manager queue expected to be available", managedQueue); + assertEquals("Unexpected message group key", messageGroupKey, managedQueue.getMessageGroupKey()); + assertEquals("Unexpected message group sharing", true, managedQueue.isMessageGroupSharedGroups()); + } + @Override public Message createNextMessage(Session session, int messageNumber) throws JMSException { -- cgit v1.2.1