diff options
author | Alex Rudyy <orudyy@apache.org> | 2013-06-06 23:12:19 +0000 |
---|---|---|
committer | Alex Rudyy <orudyy@apache.org> | 2013-06-06 23:12:19 +0000 |
commit | 51c92f5b0d6ee67f65041b7b81e655f7e6ae30b6 (patch) | |
tree | 4987bb184848e313bd7278cdc1f0e5874b1ba4fa | |
parent | 5b6536d0ed4183e4a221e4e927df476b05609a3f (diff) | |
download | qpid-python-51c92f5b0d6ee67f65041b7b81e655f7e6ae30b6.tar.gz |
QPID-4912: Allow setting queue binding arguments in exchange MBean
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1490468 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 179 insertions, 19 deletions
diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java index 56802d0403..3e1a47c431 100644 --- a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java +++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java @@ -261,8 +261,15 @@ public class ExchangeMBean extends AMQManagedObject implements ManagedExchange { final Map<String,Object> arguments = new HashMap<String, Object>(); + createNewBinding(queueName, binding, arguments); + } + + @Override + public void createNewBinding(String queueName, String binding, Map<String, Object> arguments) throws JMException + { if(HEADERS_EXCHANGE_TYPE.equals(_exchange.getExchangeType())) { + arguments = new HashMap<String, Object>(arguments); final String[] bindings = binding.split(","); for (int i = 0; i < bindings.length; i++) { diff --git a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBeanTest.java b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBeanTest.java index e350f80a25..e2b4567867 100644 --- a/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBeanTest.java +++ b/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBeanTest.java @@ -18,40 +18,29 @@ */ package org.apache.qpid.server.jmx.mbeans; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.never; -import static org.mockito.Matchers.isNull; -import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyMap; - +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.TreeMap; import javax.management.JMException; -import javax.management.ListenerNotFoundException; -import javax.management.Notification; -import javax.management.NotificationListener; import javax.management.OperationsException; +import junit.framework.TestCase; + import org.apache.qpid.server.jmx.ManagedObjectRegistry; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Exchange; -import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.queue.NotificationCheck; -import org.mockito.ArgumentMatcher; - -import junit.framework.TestCase; public class ExchangeMBeanTest extends TestCase { @@ -126,6 +115,13 @@ public class ExchangeMBeanTest extends TestCase verify(_mockExchange, never()).createBinding(anyString(), any(Queue.class), anyMap(), anyMap()); } + public void testCreateNewBindingWithArguments() throws Exception + { + Map<String, Object> arguments = Collections.<String, Object>singletonMap("x-filter-jms-selector", "ID='test'"); + _exchangeMBean.createNewBinding(QUEUE1_NAME, BINDING1, arguments); + verify(_mockExchange).createBinding(BINDING1, _mockQueue1, arguments, Collections.<String, Object>emptyMap()); + } + public void testRemoveBinding() throws Exception { Binding mockBinding1 = createBindingOnQueue(BINDING1, _mockQueue1); diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java index 6c1ab3de8d..32e5fbd125 100644 --- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java +++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; /** * The management interface exposed to allow management of an Exchange. @@ -113,7 +114,23 @@ public interface ManagedExchange void createNewBinding(@MBeanOperationParameter(name= ManagedQueue.TYPE, description="Queue name") String queueName, @MBeanOperationParameter(name="Binding", description="New binding")String binding) throws JMException; - + + /** + * Creates new binding with the given queue using given binding key and binding arguments. + * @param queueName + * @param binding + * @param arguments + * @throws JMException + * @since Qpid JMX API 2.7 + */ + @MBeanOperation(name="createNewBinding", + description="create a new binding with this exchange", + impact= MBeanOperationInfo.ACTION) + void createNewBinding(@MBeanOperationParameter(name= ManagedQueue.TYPE, description="Queue name") String queueName, + @MBeanOperationParameter(name="Binding", description="New binding")String binding, + @MBeanOperationParameter(name="Arguments", description="Binding arguments")Map<String, Object> arguments) + throws JMException; + /** * Removes an exchange binding from a queue. * diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java new file mode 100644 index 0000000000..99ee99e8c5 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java @@ -0,0 +1,140 @@ +package org.apache.qpid.systest.management.jmx; + +import java.util.Collections; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class ExchangeManagementTest extends QpidBrokerTestCase +{ + private static final String MESSAGE_PROPERTY_INDEX = "index"; + private static final String MESSAGE_PROPERTY_TEST = "test"; + private static final String MESSAGE_PROPERTY_DUMMY = "dummy"; + private static final String SELECTOR_ARGUMENT = AMQPFilterTypes.JMS_SELECTOR.toString(); + private static final String SELECTOR = MESSAGE_PROPERTY_TEST + "='test'"; + private static final String VIRTUAL_HOST = "test"; + + private JMXTestUtils _jmxUtils; + private ManagedBroker _managedBroker; + private String _testQueueName; + private ManagedExchange _directExchange; + private ManagedExchange _topicExchange; + private ManagedExchange _fanoutExchange; + private ManagedExchange _headersExchange; + private Connection _connection; + private Session _session; + + public void setUp() throws Exception + { + getBrokerConfiguration().addJmxManagementConfiguration(); + + _jmxUtils = new JMXTestUtils(this); + + super.setUp(); + + _jmxUtils.open(); + + _managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + _testQueueName = getTestName(); + _managedBroker.createNewQueue(_testQueueName, null, true); + _directExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME.asString()); + _topicExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME.asString()); + _fanoutExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME.asString()); + _headersExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME.asString()); + + _connection = getConnection(); + _connection.start(); + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + } + + public void testCreateNewBindingWithArgumentsOnDirectExchange() throws Exception + { + String bindingKey = "test-direct-binding"; + + _directExchange.createNewBinding(_testQueueName, bindingKey, + Collections.<String, Object> singletonMap(SELECTOR_ARGUMENT, SELECTOR)); + + bindingTest(_session.createQueue(bindingKey)); + } + + public void testCreateNewBindingWithArgumentsOnTopicExchange() throws Exception + { + String bindingKey = "test-topic-binding"; + + _topicExchange.createNewBinding(_testQueueName, bindingKey, + Collections.<String, Object> singletonMap(SELECTOR_ARGUMENT, SELECTOR)); + + bindingTest(_session.createTopic(bindingKey)); + } + + public void testCreateNewBindingWithArgumentsOnFanoutExchange() throws Exception + { + _fanoutExchange.createNewBinding(_testQueueName, null, + Collections.<String, Object> singletonMap(SELECTOR_ARGUMENT, SELECTOR)); + + bindingTest(_session.createQueue("fanout://amq.fanout//?routingkey='routing-key-must-not-be-null'")); + } + + public void testCreateNewBindingWithArgumentsOnHeadersExchange() throws Exception + { + // headers exchange uses 'dummy' property to match messages + // i.e. all test messages have matching header value + _headersExchange.createNewBinding(_testQueueName, "x-match=any,dummy=test", + Collections.<String, Object> singletonMap(SELECTOR_ARGUMENT, SELECTOR)); + + bindingTest(_session.createQueue("headers://amq.match//?routingkey='routing-key-must-not-be-null'")); + } + + private void bindingTest(Destination destination) throws JMSException + { + publishMessages(destination, 4); + receiveAndAssertMessages(2); + } + + private void publishMessages(Destination destination, int messageNumber) throws JMSException + { + MessageProducer producer = _session.createProducer(destination); + + for (int i = 0; i < messageNumber; i++) + { + Message m = _session.createMessage(); + m.setStringProperty(MESSAGE_PROPERTY_TEST, i % 2 == 0 ? MESSAGE_PROPERTY_TEST : ""); + m.setIntProperty(MESSAGE_PROPERTY_INDEX, i); + m.setStringProperty(MESSAGE_PROPERTY_DUMMY, "test"); + producer.send(m); + } + _session.commit(); + } + + private void receiveAndAssertMessages(int messageNumber) throws JMSException + { + MessageConsumer consumer = _session.createConsumer(_session.createQueue(_testQueueName)); + + for (int i = 0; i < messageNumber; i++) + { + int index = i * 2; + Message message = consumer.receive(1000l); + assertNotNull("Expected message is not received at " + i, message); + assertEquals("Unexpected test property at " + i, MESSAGE_PROPERTY_TEST, + message.getStringProperty(MESSAGE_PROPERTY_TEST)); + assertEquals("Unexpected index property at " + i, index, message.getIntProperty(MESSAGE_PROPERTY_INDEX)); + } + + Message message = consumer.receive(1000l); + assertNull("Unexpected message received", message); + _session.commit(); + } + +} |