diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java | 400 |
1 files changed, 400 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java new file mode 100644 index 0000000000..d1ea5dba69 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeType; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.AMQQueueMBean; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.ManagementActor; + +/** + * This MBean implements the broker management interface and exposes the + * Broker level management features like creating and deleting exchanges and queue. + */ +@MBeanDescription("This MBean exposes the broker level management features") +public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBroker +{ + private final QueueRegistry _queueRegistry; + private final ExchangeRegistry _exchangeRegistry; + private final ExchangeFactory _exchangeFactory; + private final DurableConfigurationStore _durableConfig; + + private final VirtualHostImpl.VirtualHostMBean _virtualHostMBean; + + @MBeanConstructor("Creates the Broker Manager MBean") + public AMQBrokerManagerMBean(VirtualHostImpl.VirtualHostMBean virtualHostMBean) throws JMException + { + super(ManagedBroker.class, ManagedBroker.TYPE); + + _virtualHostMBean = virtualHostMBean; + VirtualHost virtualHost = virtualHostMBean.getVirtualHost(); + + _queueRegistry = virtualHost.getQueueRegistry(); + _exchangeRegistry = virtualHost.getExchangeRegistry(); + _durableConfig = virtualHost.getDurableConfigurationStore(); + _exchangeFactory = virtualHost.getExchangeFactory(); + } + + public String getObjectInstanceName() + { + return _virtualHostMBean.getVirtualHost().getName(); + } + + /** + * Returns an array of the exchange types available for creation. + * @since Qpid JMX API 1.3 + * @throws IOException + */ + public String[] getExchangeTypes() throws IOException + { + ArrayList<String> exchangeTypes = new ArrayList<String>(); + for(ExchangeType<? extends Exchange> ex : _exchangeFactory.getPublicCreatableTypes()) + { + exchangeTypes.add(ex.getName().toString()); + } + + return exchangeTypes.toArray(new String[0]); + } + + /** + * Returns a list containing the names of the attributes available for the Queue mbeans. + * @since Qpid JMX API 1.3 + * @throws IOException + */ + public List<String> retrieveQueueAttributeNames() throws IOException + { + return ManagedQueue.QUEUE_ATTRIBUTES; + } + + /** + * Returns a List of Object Lists containing the requested attribute values (in the same sequence requested) for each queue in the virtualhost. + * If a particular attribute cant be found or raises an mbean/reflection exception whilst being gathered its value is substituted with the String "-". + * @since Qpid JMX API 1.3 + * @throws IOException + */ + public List<List<Object>> retrieveQueueAttributeValues(String[] attributes) throws IOException + { + if(_queueRegistry.getQueues().size() == 0) + { + return new ArrayList<List<Object>>(); + } + + List<List<Object>> queueAttributesList = new ArrayList<List<Object>>(_queueRegistry.getQueues().size()); + + int attributesLength = attributes.length; + + for(AMQQueue queue : _queueRegistry.getQueues()) + { + AMQQueueMBean mbean = (AMQQueueMBean) queue.getManagedObject(); + + if(mbean == null) + { + continue; + } + + List<Object> attributeValues = new ArrayList<Object>(attributesLength); + + for(int i=0; i < attributesLength; i++) + { + try + { + attributeValues.add(mbean.getAttribute(attributes[i])); + } + catch (Exception e) + { + attributeValues.add("-"); + } + } + + queueAttributesList.add(attributeValues); + } + + return queueAttributesList; + } + + /** + * Creates new exchange and registers it with the registry. + * + * @param exchangeName + * @param type + * @param durable + * @throws JMException + * @throws MBeanException + */ + public void createNewExchange(String exchangeName, String type, boolean durable) throws JMException, MBeanException + { + CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); + try + { + synchronized (_exchangeRegistry) + { + Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName)); + if (exchange == null) + { + exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), + durable, false, 0); + _exchangeRegistry.registerExchange(exchange); + if (durable) + { + _durableConfig.createExchange(exchange); + } + } + else + { + throw new JMException("The exchange \"" + exchangeName + "\" already exists."); + } + } + } + catch (AMQException ex) + { + JMException jme = new JMException(ex.toString()); + throw new MBeanException(jme, "Error in creating exchange " + exchangeName); + } + finally + { + CurrentActor.remove(); + } + } + + /** + * Unregisters the exchange from registry. + * + * @param exchangeName + * @throws JMException + * @throws MBeanException + */ + public void unregisterExchange(String exchangeName) throws JMException, MBeanException + { + // TODO + // Check if the exchange is in use. + // boolean inUse = false; + // Check if there are queue-bindings with the exchange and unregister + // when there are no bindings. + CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); + try + { + _exchangeRegistry.unregisterExchange(new AMQShortString(exchangeName), false); + } + catch (AMQException ex) + { + JMException jme = new JMException(ex.toString()); + throw new MBeanException(jme, "Error in unregistering exchange " + exchangeName); + } + finally + { + CurrentActor.remove(); + } + } + + /** + * Creates a new queue and registers it with the registry and puts it + * in persistance storage if durable queue. + * + * @param queueName + * @param durable + * @param owner + * @throws JMException + * @throws MBeanException + */ + public void createNewQueue(String queueName, String owner, boolean durable) throws JMException, MBeanException + { + AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName)); + if (queue != null) + { + throw new JMException("The queue \"" + queueName + "\" already exists."); + } + + CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); + try + { + AMQShortString ownerShortString = null; + if (owner != null) + { + ownerShortString = new AMQShortString(owner); + } + + queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, false, getVirtualHost(), null); + if (queue.isDurable() && !queue.isAutoDelete()) + { + _durableConfig.createQueue(queue); + } + + _queueRegistry.registerQueue(queue); + } + catch (AMQException ex) + { + JMException jme = new JMException(ex.toString()); + throw new MBeanException(jme, "Error in creating queue " + queueName); + } + finally + { + CurrentActor.remove(); + } + } + + private VirtualHost getVirtualHost() + { + return _virtualHostMBean.getVirtualHost(); + } + + /** + * Deletes the queue from queue registry and persistant storage. + * + * @param queueName + * @throws JMException + * @throws MBeanException + */ + public void deleteQueue(String queueName) throws JMException, MBeanException + { + AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName)); + if (queue == null) + { + throw new JMException("The Queue " + queueName + " is not a registered queue."); + } + + CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); + try + { + queue.delete(); + if (queue.isDurable()) + { + _durableConfig.removeQueue(queue); + } + } + catch (AMQException ex) + { + JMException jme = new JMException(ex.toString()); + throw new MBeanException(jme, "Error in deleting queue " + queueName); + } + finally + { + CurrentActor.remove(); + } + } + + @Override + public ManagedObject getParentObject() + { + return _virtualHostMBean; + } + + // This will have a single instance for a virtual host, so not having the name property in the ObjectName + @Override + public ObjectName getObjectName() throws MalformedObjectNameException + { + return getObjectNameForSingleInstanceMBean(); + } + + public void resetStatistics() throws Exception + { + getVirtualHost().resetStatistics(); + } + + public double getPeakMessageDeliveryRate() + { + return getVirtualHost().getMessageDeliveryStatistics().getPeak(); + } + + public double getPeakDataDeliveryRate() + { + return getVirtualHost().getDataDeliveryStatistics().getPeak(); + } + + public double getMessageDeliveryRate() + { + return getVirtualHost().getMessageDeliveryStatistics().getRate(); + } + + public double getDataDeliveryRate() + { + return getVirtualHost().getDataDeliveryStatistics().getRate(); + } + + public long getTotalMessagesDelivered() + { + return getVirtualHost().getMessageDeliveryStatistics().getTotal(); + } + + public long getTotalDataDelivered() + { + return getVirtualHost().getDataDeliveryStatistics().getTotal(); + } + + public double getPeakMessageReceiptRate() + { + return getVirtualHost().getMessageReceiptStatistics().getPeak(); + } + + public double getPeakDataReceiptRate() + { + return getVirtualHost().getDataReceiptStatistics().getPeak(); + } + + public double getMessageReceiptRate() + { + return getVirtualHost().getMessageReceiptStatistics().getRate(); + } + + public double getDataReceiptRate() + { + return getVirtualHost().getDataReceiptStatistics().getRate(); + } + + public long getTotalMessagesReceived() + { + return getVirtualHost().getMessageReceiptStatistics().getTotal(); + } + + public long getTotalDataReceived() + { + return getVirtualHost().getDataReceiptStatistics().getTotal(); + } + + public boolean isStatisticsEnabled() + { + return getVirtualHost().isStatisticsEnabled(); + } +} |