summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2013-06-05 13:48:44 +0000
committerAlex Rudyy <orudyy@apache.org>2013-06-05 13:48:44 +0000
commit81de16dca5f5b035a25ad87e9e3fbb4c45093e2f (patch)
tree514978ef8ac4d9d562cacb2575c5bf65e467e0f6
parent0fc65ef26eb727986a1eed963b5314c1d8b463f4 (diff)
downloadqpid-python-81de16dca5f5b035a25ad87e9e3fbb4c45093e2f.tar.gz
QPID-4908: Expose queue attributes MessageGroupKey and MessageGroupSharedGroups via queue MBean
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1489872 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java13
-rw-r--r--java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java12
-rw-r--r--java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java35
-rw-r--r--java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java35
5 files changed, 88 insertions, 9 deletions
diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
index 94fac218ff..cda32cc3f8 100644
--- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
+++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
@@ -662,4 +662,17 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
final Number statistic = (Number) _queue.getStatistics().getStatistic(name);
return statistic == null ? Integer.valueOf(0) : statistic;
}
+
+ @Override
+ public String getMessageGroupKey()
+ {
+ return (String) _queue.getAttribute(Queue.MESSAGE_GROUP_KEY);
+ }
+
+ @Override
+ public boolean isMessageGroupSharedGroups()
+ {
+ Boolean value = (Boolean) _queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS);
+ return value == null ? false : value.booleanValue();
+ }
}
diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
index f2663bca4e..3711a90f3b 100644
--- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
+++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
@@ -436,4 +436,16 @@ public class QueueMBeanTest extends QpidTestCase
byte[] data = (byte[]) comp.get(ManagedQueue.CONTENT);
assertEquals(messageSize, data.length);
}
+
+ public void testGetMessageGroupKey()
+ {
+ when(_mockQueue.getAttribute(Queue.MESSAGE_GROUP_KEY)).thenReturn(getTestName());
+ assertEquals("Unexpected message group key", getTestName(), _queueMBean.getMessageGroupKey());
+ }
+
+ public void testIsSharedMessageGroup()
+ {
+ when(_mockQueue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS)).thenReturn(true);
+ assertEquals("Unexpected message group sharing", true, _queueMBean.isMessageGroupSharedGroups());
+ }
}
diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
index b00b28b2a9..e6f24c2c73 100644
--- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
+++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
@@ -89,6 +89,8 @@ public interface ManagedQueue
static final String ATTR_FLOW_RESUME_CAPACITY = "FlowResumeCapacity";
static final String ATTR_EXCLUSIVE = "Exclusive";
static final String ATTR_ALT_EXCHANGE = "AlternateExchange";
+ static final String ATTR_SHARED_MESSAGE_GROUP = "MessageGroupSharedGroups";
+ static final String ATTR_MESSAGE_GROUP_KEY = "MessageGroupKey";
//All attribute names constant
static final List<String> QUEUE_ATTRIBUTES
@@ -116,7 +118,9 @@ public interface ManagedQueue
ATTR_FLOW_OVERFULL,
ATTR_FLOW_RESUME_CAPACITY,
ATTR_EXCLUSIVE,
- ATTR_ALT_EXCHANGE
+ ATTR_ALT_EXCHANGE,
+ ATTR_SHARED_MESSAGE_GROUP,
+ ATTR_MESSAGE_GROUP_KEY
))));
/**
@@ -309,13 +313,6 @@ public interface ManagedQueue
void setDescription(String string);
/**
- * Gets the queue type
- * @since Qpid JMX API 2.5
- */
- @MBeanAttribute(name="QueueType", description="Type of the queue e.g. standard, priority, etc")
- String getQueueType();
-
- /**
* Returns the current flow control FlowResumeCapacity of the queue in bytes.
*
* @since Qpid JMX API 1.6
@@ -385,6 +382,28 @@ public interface ManagedQueue
*/
String getAlternateExchange() throws IOException;
+
+ /**
+ * Gets the queue type
+ * @since Qpid JMX API 2.5
+ */
+ @MBeanAttribute(name="QueueType", description="Type of the queue e.g. standard, priority, etc")
+ String getQueueType();
+
+ /**
+ * Gets the message group key
+ * @since Qpid JMX API 2.7
+ */
+ @MBeanAttribute(name="MessageGroupKey", description="Message header name to hold message group value")
+ String getMessageGroupKey();
+
+ /**
+ * Gets the message group key
+ * @since Qpid JMX API 2.7
+ */
+ @MBeanAttribute(name="MessageGroupSharedGroups", description="Indicates C++ compatibility mode, the Broker enforces a looser guarantee, nameley that all the currently unacknowledged messages in a group will be sent to the same consumer. This means that only one consumer can be processing messages from a particular group at a given time. When the consumer acknowledges all of its acquired messages, then the broker may pass the next pending message from that group to a different consumer")
+ boolean isMessageGroupSharedGroups();
+
//********** Operations *****************//
diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
index 43249ea004..b1519a27b6 100644
--- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
+++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
@@ -46,7 +46,7 @@ public interface ServerInformation
* Qpid JMX API 1.1 can be assumed.
*/
int QPID_JMX_API_MAJOR_VERSION = 2;
- int QPID_JMX_API_MINOR_VERSION = 6;
+ int QPID_JMX_API_MINOR_VERSION = 7;
/**
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
index 448886d056..cf066e3c01 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
@@ -28,6 +28,7 @@ import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.NotificationCheckTest;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.queue.SimpleAMQQueueTest;
import org.apache.qpid.test.client.destination.AddressBasedDestinationTest;
import org.apache.qpid.test.utils.JMXTestUtils;
@@ -635,6 +636,40 @@ public class QueueManagementTest extends QpidBrokerTestCase
assertMessageIndicesOn(_sourceQueue, 1,2,3,4,5,6,13);
}
+ public void testGetMessageGroupKey() throws Exception
+ {
+ final String queueName = getName();
+ final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST);
+
+ final Object messageGroupKey = "test";
+ final Map<String, Object> arguments = Collections.singletonMap(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, messageGroupKey);
+ managedBroker.createNewQueue(queueName, null, true, arguments);
+
+ final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
+
+ assertNotNull("Manager queue expected to be available", managedQueue);
+ assertEquals("Unexpected message group key", messageGroupKey, managedQueue.getMessageGroupKey());
+ assertEquals("Unexpected message group sharing", false, managedQueue.isMessageGroupSharedGroups());
+ }
+
+ public void testIsMessageGroupSharedGroups() throws Exception
+ {
+ final String queueName = getName();
+ final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST);
+
+ final Object messageGroupKey = "test";
+ final Map<String, Object> arguments = new HashMap<String, Object>(2);
+ arguments.put(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, messageGroupKey);
+ arguments.put(SimpleAMQQueue.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE);
+ managedBroker.createNewQueue(queueName, null, true, arguments);
+
+ final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
+
+ assertNotNull("Manager queue expected to be available", managedQueue);
+ assertEquals("Unexpected message group key", messageGroupKey, managedQueue.getMessageGroupKey());
+ assertEquals("Unexpected message group sharing", true, managedQueue.isMessageGroupSharedGroups());
+ }
+
@Override
public Message createNextMessage(Session session, int messageNumber) throws JMSException
{