summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2013-06-06 23:12:19 +0000
committerAlex Rudyy <orudyy@apache.org>2013-06-06 23:12:19 +0000
commit51c92f5b0d6ee67f65041b7b81e655f7e6ae30b6 (patch)
tree4987bb184848e313bd7278cdc1f0e5874b1ba4fa
parent5b6536d0ed4183e4a221e4e927df476b05609a3f (diff)
downloadqpid-python-51c92f5b0d6ee67f65041b7b81e655f7e6ae30b6.tar.gz
QPID-4912: Allow setting queue binding arguments in exchange MBean
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1490468 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java7
-rw-r--r--java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBeanTest.java32
-rw-r--r--java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java19
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java140
4 files changed, 179 insertions, 19 deletions
diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
index 56802d0403..3e1a47c431 100644
--- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
+++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
@@ -261,8 +261,15 @@ public class ExchangeMBean extends AMQManagedObject implements ManagedExchange
{
final Map<String,Object> arguments = new HashMap<String, Object>();
+ createNewBinding(queueName, binding, arguments);
+ }
+
+ @Override
+ public void createNewBinding(String queueName, String binding, Map<String, Object> arguments) throws JMException
+ {
if(HEADERS_EXCHANGE_TYPE.equals(_exchange.getExchangeType()))
{
+ arguments = new HashMap<String, Object>(arguments);
final String[] bindings = binding.split(",");
for (int i = 0; i < bindings.length; i++)
{
diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBeanTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBeanTest.java
index e350f80a25..e2b4567867 100644
--- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBeanTest.java
+++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBeanTest.java
@@ -18,40 +18,29 @@
*/
package org.apache.qpid.server.jmx.mbeans;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.never;
-import static org.mockito.Matchers.isNull;
-import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.anyMap;
-
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.TreeMap;
import javax.management.JMException;
-import javax.management.ListenerNotFoundException;
-import javax.management.Notification;
-import javax.management.NotificationListener;
import javax.management.OperationsException;
+import junit.framework.TestCase;
+
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.queue.NotificationCheck;
-import org.mockito.ArgumentMatcher;
-
-import junit.framework.TestCase;
public class ExchangeMBeanTest extends TestCase
{
@@ -126,6 +115,13 @@ public class ExchangeMBeanTest extends TestCase
verify(_mockExchange, never()).createBinding(anyString(), any(Queue.class), anyMap(), anyMap());
}
+ public void testCreateNewBindingWithArguments() throws Exception
+ {
+ Map<String, Object> arguments = Collections.<String, Object>singletonMap("x-filter-jms-selector", "ID='test'");
+ _exchangeMBean.createNewBinding(QUEUE1_NAME, BINDING1, arguments);
+ verify(_mockExchange).createBinding(BINDING1, _mockQueue1, arguments, Collections.<String, Object>emptyMap());
+ }
+
public void testRemoveBinding() throws Exception
{
Binding mockBinding1 = createBindingOnQueue(BINDING1, _mockQueue1);
diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
index 6c1ab3de8d..32e5fbd125 100644
--- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
+++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
/**
* The management interface exposed to allow management of an Exchange.
@@ -113,7 +114,23 @@ public interface ManagedExchange
void createNewBinding(@MBeanOperationParameter(name= ManagedQueue.TYPE, description="Queue name") String queueName,
@MBeanOperationParameter(name="Binding", description="New binding")String binding)
throws JMException;
-
+
+ /**
+ * Creates new binding with the given queue using given binding key and binding arguments.
+ * @param queueName
+ * @param binding
+ * @param arguments
+ * @throws JMException
+ * @since Qpid JMX API 2.7
+ */
+ @MBeanOperation(name="createNewBinding",
+ description="create a new binding with this exchange",
+ impact= MBeanOperationInfo.ACTION)
+ void createNewBinding(@MBeanOperationParameter(name= ManagedQueue.TYPE, description="Queue name") String queueName,
+ @MBeanOperationParameter(name="Binding", description="New binding")String binding,
+ @MBeanOperationParameter(name="Arguments", description="Binding arguments")Map<String, Object> arguments)
+ throws JMException;
+
/**
* Removes an exchange binding from a queue.
*
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java
new file mode 100644
index 0000000000..99ee99e8c5
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java
@@ -0,0 +1,140 @@
+package org.apache.qpid.systest.management.jmx;
+
+import java.util.Collections;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedExchange;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ExchangeManagementTest extends QpidBrokerTestCase
+{
+ private static final String MESSAGE_PROPERTY_INDEX = "index";
+ private static final String MESSAGE_PROPERTY_TEST = "test";
+ private static final String MESSAGE_PROPERTY_DUMMY = "dummy";
+ private static final String SELECTOR_ARGUMENT = AMQPFilterTypes.JMS_SELECTOR.toString();
+ private static final String SELECTOR = MESSAGE_PROPERTY_TEST + "='test'";
+ private static final String VIRTUAL_HOST = "test";
+
+ private JMXTestUtils _jmxUtils;
+ private ManagedBroker _managedBroker;
+ private String _testQueueName;
+ private ManagedExchange _directExchange;
+ private ManagedExchange _topicExchange;
+ private ManagedExchange _fanoutExchange;
+ private ManagedExchange _headersExchange;
+ private Connection _connection;
+ private Session _session;
+
+ public void setUp() throws Exception
+ {
+ getBrokerConfiguration().addJmxManagementConfiguration();
+
+ _jmxUtils = new JMXTestUtils(this);
+
+ super.setUp();
+
+ _jmxUtils.open();
+
+ _managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST);
+ _testQueueName = getTestName();
+ _managedBroker.createNewQueue(_testQueueName, null, true);
+ _directExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME.asString());
+ _topicExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME.asString());
+ _fanoutExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME.asString());
+ _headersExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME.asString());
+
+ _connection = getConnection();
+ _connection.start();
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ public void testCreateNewBindingWithArgumentsOnDirectExchange() throws Exception
+ {
+ String bindingKey = "test-direct-binding";
+
+ _directExchange.createNewBinding(_testQueueName, bindingKey,
+ Collections.<String, Object> singletonMap(SELECTOR_ARGUMENT, SELECTOR));
+
+ bindingTest(_session.createQueue(bindingKey));
+ }
+
+ public void testCreateNewBindingWithArgumentsOnTopicExchange() throws Exception
+ {
+ String bindingKey = "test-topic-binding";
+
+ _topicExchange.createNewBinding(_testQueueName, bindingKey,
+ Collections.<String, Object> singletonMap(SELECTOR_ARGUMENT, SELECTOR));
+
+ bindingTest(_session.createTopic(bindingKey));
+ }
+
+ public void testCreateNewBindingWithArgumentsOnFanoutExchange() throws Exception
+ {
+ _fanoutExchange.createNewBinding(_testQueueName, null,
+ Collections.<String, Object> singletonMap(SELECTOR_ARGUMENT, SELECTOR));
+
+ bindingTest(_session.createQueue("fanout://amq.fanout//?routingkey='routing-key-must-not-be-null'"));
+ }
+
+ public void testCreateNewBindingWithArgumentsOnHeadersExchange() throws Exception
+ {
+ // headers exchange uses 'dummy' property to match messages
+ // i.e. all test messages have matching header value
+ _headersExchange.createNewBinding(_testQueueName, "x-match=any,dummy=test",
+ Collections.<String, Object> singletonMap(SELECTOR_ARGUMENT, SELECTOR));
+
+ bindingTest(_session.createQueue("headers://amq.match//?routingkey='routing-key-must-not-be-null'"));
+ }
+
+ private void bindingTest(Destination destination) throws JMSException
+ {
+ publishMessages(destination, 4);
+ receiveAndAssertMessages(2);
+ }
+
+ private void publishMessages(Destination destination, int messageNumber) throws JMSException
+ {
+ MessageProducer producer = _session.createProducer(destination);
+
+ for (int i = 0; i < messageNumber; i++)
+ {
+ Message m = _session.createMessage();
+ m.setStringProperty(MESSAGE_PROPERTY_TEST, i % 2 == 0 ? MESSAGE_PROPERTY_TEST : "");
+ m.setIntProperty(MESSAGE_PROPERTY_INDEX, i);
+ m.setStringProperty(MESSAGE_PROPERTY_DUMMY, "test");
+ producer.send(m);
+ }
+ _session.commit();
+ }
+
+ private void receiveAndAssertMessages(int messageNumber) throws JMSException
+ {
+ MessageConsumer consumer = _session.createConsumer(_session.createQueue(_testQueueName));
+
+ for (int i = 0; i < messageNumber; i++)
+ {
+ int index = i * 2;
+ Message message = consumer.receive(1000l);
+ assertNotNull("Expected message is not received at " + i, message);
+ assertEquals("Unexpected test property at " + i, MESSAGE_PROPERTY_TEST,
+ message.getStringProperty(MESSAGE_PROPERTY_TEST));
+ assertEquals("Unexpected index property at " + i, index, message.getIntProperty(MESSAGE_PROPERTY_INDEX));
+ }
+
+ Message message = consumer.receive(1000l);
+ assertNull("Unexpected message received", message);
+ _session.commit();
+ }
+
+}