summaryrefslogtreecommitdiff
path: root/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java')
-rw-r--r--java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java313
1 files changed, 313 insertions, 0 deletions
diff --git a/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
new file mode 100644
index 0000000000..56802d0403
--- /dev/null
+++ b/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
@@ -0,0 +1,313 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.jmx.mbeans;
+
+import org.apache.qpid.management.common.mbeans.ManagedExchange;
+import org.apache.qpid.server.jmx.AMQManagedObject;
+import org.apache.qpid.server.jmx.ManagedObject;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.OperationsException;
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ExchangeMBean extends AMQManagedObject implements ManagedExchange
+{
+
+ public static final String FANOUT_EXCHANGE_TYPE = "fanout";
+ public static final String HEADERS_EXCHANGE_TYPE = "headers";
+
+ private static final String[] TABULAR_UNIQUE_INDEX_ARRAY =
+ TABULAR_UNIQUE_INDEX.toArray(new String[TABULAR_UNIQUE_INDEX.size()]);
+
+ private static final String[] COMPOSITE_ITEM_NAMES_ARRAY =
+ COMPOSITE_ITEM_NAMES.toArray(new String[COMPOSITE_ITEM_NAMES.size()]);
+
+ private static final String[] COMPOSITE_ITEM_DESCRIPTIONS_ARRAY =
+ COMPOSITE_ITEM_DESCRIPTIONS.toArray(new String[COMPOSITE_ITEM_DESCRIPTIONS.size()]);
+
+ private static final OpenType[] BINDING_ITEM_TYPES;
+ private static final CompositeType BINDING_DATA_TYPE;
+ private static final OpenType[] HEADERS_BINDING_ITEM_TYPES;
+
+
+ private static final CompositeType HEADERS_BINDING_DATA_TYPE;
+
+ private static final String[] HEADERS_COMPOSITE_ITEM_NAMES_ARRAY =
+ HEADERS_COMPOSITE_ITEM_NAMES.toArray(new String[HEADERS_COMPOSITE_ITEM_NAMES.size()]);
+
+ private static final String[] HEADERS_COMPOSITE_ITEM_DESCS_ARRAY =
+ HEADERS_COMPOSITE_ITEM_DESC.toArray(new String[HEADERS_COMPOSITE_ITEM_DESC.size()]);
+ private static final String[] HEADERS_TABULAR_UNIQUE_INDEX_ARRAY =
+ HEADERS_TABULAR_UNIQUE_INDEX.toArray(new String[HEADERS_TABULAR_UNIQUE_INDEX.size()]);
+
+ static
+ {
+ try
+ {
+ BINDING_ITEM_TYPES = new OpenType[] {SimpleType.STRING, new ArrayType(1, SimpleType.STRING)};
+
+ BINDING_DATA_TYPE= new CompositeType("Exchange Binding", "Binding key and Queue names",
+ COMPOSITE_ITEM_NAMES_ARRAY,
+ COMPOSITE_ITEM_DESCRIPTIONS_ARRAY,
+ BINDING_ITEM_TYPES);
+
+ HEADERS_BINDING_ITEM_TYPES = new OpenType[] {SimpleType.INTEGER,
+ SimpleType.STRING,
+ new ArrayType(1, SimpleType.STRING)};
+
+ HEADERS_BINDING_DATA_TYPE = new CompositeType("Exchange Binding", "Queue name and header bindings",
+ HEADERS_COMPOSITE_ITEM_NAMES_ARRAY,
+ HEADERS_COMPOSITE_ITEM_DESCS_ARRAY,
+ HEADERS_BINDING_ITEM_TYPES);
+
+
+ }
+ catch(OpenDataException e)
+ {
+ throw new RuntimeException("Unexpected Error creating ArrayType", e);
+ }
+ }
+
+
+ private final Exchange _exchange;
+ private final VirtualHostMBean _vhostMBean;
+
+ protected ExchangeMBean(Exchange exchange, VirtualHostMBean virtualHostMBean)
+ throws JMException
+ {
+ super(ManagedExchange.class, ManagedExchange.TYPE, virtualHostMBean.getRegistry());
+ _exchange = exchange;
+ _vhostMBean = virtualHostMBean;
+
+ register();
+ }
+
+ public String getObjectInstanceName()
+ {
+ return ObjectName.quote(getName());
+ }
+
+ @Override
+ public ManagedObject getParentObject()
+ {
+ return _vhostMBean;
+ }
+
+ public ObjectName getObjectName() throws MalformedObjectNameException
+ {
+ String objNameString = super.getObjectName().toString();
+ objNameString = objNameString + ",ExchangeType=" + getExchangeType();
+ return new ObjectName(objNameString);
+ }
+
+
+ public String getName()
+ {
+ return _exchange.getName();
+ }
+
+ public String getExchangeType()
+ {
+ return _exchange.getExchangeType();
+ }
+
+ public Integer getTicketNo()
+ {
+ return 0;
+ }
+
+ public boolean isDurable()
+ {
+ return _exchange.isDurable();
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _exchange.getLifetimePolicy() == LifetimePolicy.AUTO_DELETE;
+ }
+
+ public TabularData bindings() throws IOException, JMException
+ {
+ if(HEADERS_EXCHANGE_TYPE.equals(_exchange.getExchangeType()))
+ {
+ return getHeadersBindings(_exchange.getBindings());
+ }
+ else
+ {
+ return getNonHeadersBindings(_exchange.getBindings());
+ }
+ }
+
+ private TabularData getHeadersBindings(Collection<Binding> bindings) throws OpenDataException
+ {
+ TabularType bindinglistDataType =
+ new TabularType("Exchange Bindings", "List of exchange bindings for " + getName(),
+ HEADERS_BINDING_DATA_TYPE,
+ HEADERS_TABULAR_UNIQUE_INDEX_ARRAY);
+
+ TabularDataSupport bindingList = new TabularDataSupport(bindinglistDataType);
+ int count = 1;
+ for (Binding binding : bindings)
+ {
+
+ String queueName = binding.getParent(Queue.class).getName();
+
+
+ Map<String,Object> headerMappings = binding.getArguments();
+
+ final List<String> mappingList = new ArrayList<String>();
+
+ if(headerMappings != null)
+ {
+ for(Map.Entry<String,Object> entry : headerMappings.entrySet())
+ {
+
+ mappingList.add(entry.getKey() + "=" + entry.getValue());
+ }
+ }
+
+
+ Object[] bindingItemValues = {count++, queueName, mappingList.toArray(new String[0])};
+ CompositeData bindingData = new CompositeDataSupport(HEADERS_BINDING_DATA_TYPE,
+ HEADERS_COMPOSITE_ITEM_NAMES_ARRAY,
+ bindingItemValues);
+ bindingList.put(bindingData);
+ }
+
+ return bindingList;
+
+ }
+
+ private TabularData getNonHeadersBindings(Collection<Binding> bindings) throws OpenDataException
+ {
+
+ TabularType bindinglistDataType =
+ new TabularType("Exchange Bindings", "Exchange Bindings for " + getName(),
+ BINDING_DATA_TYPE,
+ TABULAR_UNIQUE_INDEX_ARRAY);
+
+ TabularDataSupport bindingList = new TabularDataSupport(bindinglistDataType);
+
+ Map<String, List<String>> bindingMap = new HashMap<String, List<String>>();
+
+ for (Binding binding : bindings)
+ {
+ String key = FANOUT_EXCHANGE_TYPE.equals(_exchange.getExchangeType()) ? "*" : binding.getName();
+ List<String> queueList = bindingMap.get(key);
+ if(queueList == null)
+ {
+ queueList = new ArrayList<String>();
+ bindingMap.put(key, queueList);
+ }
+ queueList.add(binding.getParent(Queue.class).getName());
+
+ }
+
+ for(Map.Entry<String, List<String>> entry : bindingMap.entrySet())
+ {
+ Object[] bindingItemValues = {entry.getKey(), entry.getValue().toArray(new String[0])};
+ CompositeData bindingData = new CompositeDataSupport(BINDING_DATA_TYPE,
+ COMPOSITE_ITEM_NAMES_ARRAY,
+ bindingItemValues);
+ bindingList.put(bindingData);
+ }
+
+ return bindingList;
+ }
+
+ public void createNewBinding(String queueName, String binding) throws JMException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+
+ if(HEADERS_EXCHANGE_TYPE.equals(_exchange.getExchangeType()))
+ {
+ final String[] bindings = binding.split(",");
+ for (int i = 0; i < bindings.length; i++)
+ {
+ final String[] keyAndValue = bindings[i].split("=");
+ if (keyAndValue == null || keyAndValue.length == 0 || keyAndValue.length > 2 || keyAndValue[0].length() == 0)
+ {
+ throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\"");
+ }
+
+ if(keyAndValue.length == 1)
+ {
+ //no value was given, only a key. Use an empty value to signal match on key presence alone
+ arguments.put(keyAndValue[0], "");
+ }
+ else
+ {
+ arguments.put(keyAndValue[0], keyAndValue[1]);
+ }
+ }
+ }
+
+ VirtualHost virtualHost = _exchange.getParent(VirtualHost.class);
+ Queue queue = MBeanUtils.findQueueFromQueueName(virtualHost, queueName);
+ _exchange.createBinding(binding, queue, arguments, Collections.EMPTY_MAP);
+ }
+
+ public void removeBinding(String queueName, String bindingKey)
+ throws IOException, JMException
+ {
+ VirtualHost virtualHost = _exchange.getParent(VirtualHost.class);
+ Queue queue = MBeanUtils.findQueueFromQueueName(virtualHost, queueName);;
+
+ boolean deleted = false;
+ for(Binding binding : _exchange.getBindings())
+ {
+ if(queue.equals(binding.getParent(Queue.class)) && bindingKey.equals(binding.getName()))
+ {
+ binding.delete();
+ deleted = true;
+ }
+ }
+
+ if (!deleted)
+ {
+ throw new OperationsException("No such binding \"" + bindingKey + "\" on queue \"" + queueName + "\"");
+ }
+ }
+}