/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.qpid.server; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import javax.management.JMException; import javax.management.MBeanException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueueMBean; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.ManagementActor; /** * This MBean implements the broker management interface and exposes the * Broker level management features like creating and deleting exchanges and queue. */ @MBeanDescription("This MBean exposes the broker level management features") public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBroker { private final QueueRegistry _queueRegistry; private final ExchangeRegistry _exchangeRegistry; private final ExchangeFactory _exchangeFactory; private final DurableConfigurationStore _durableConfig; private final VirtualHostImpl.VirtualHostMBean _virtualHostMBean; @MBeanConstructor("Creates the Broker Manager MBean") public AMQBrokerManagerMBean(VirtualHostImpl.VirtualHostMBean virtualHostMBean) throws JMException { super(ManagedBroker.class, ManagedBroker.TYPE); _virtualHostMBean = virtualHostMBean; VirtualHost virtualHost = virtualHostMBean.getVirtualHost(); _queueRegistry = virtualHost.getQueueRegistry(); _exchangeRegistry = virtualHost.getExchangeRegistry(); _durableConfig = virtualHost.getDurableConfigurationStore(); _exchangeFactory = virtualHost.getExchangeFactory(); } public String getObjectInstanceName() { return _virtualHostMBean.getVirtualHost().getName(); } /** * Returns an array of the exchange types available for creation. * @since Qpid JMX API 1.3 * @throws IOException */ public String[] getExchangeTypes() throws IOException { ArrayList exchangeTypes = new ArrayList(); for(ExchangeType ex : _exchangeFactory.getPublicCreatableTypes()) { exchangeTypes.add(ex.getName().toString()); } return exchangeTypes.toArray(new String[0]); } /** * Returns a list containing the names of the attributes available for the Queue mbeans. * @since Qpid JMX API 1.3 * @throws IOException */ public List retrieveQueueAttributeNames() throws IOException { return ManagedQueue.QUEUE_ATTRIBUTES; } /** * Returns a List of Object Lists containing the requested attribute values (in the same sequence requested) for each queue in the virtualhost. * If a particular attribute cant be found or raises an mbean/reflection exception whilst being gathered its value is substituted with the String "-". * @since Qpid JMX API 1.3 * @throws IOException */ public List> retrieveQueueAttributeValues(String[] attributes) throws IOException { if(_queueRegistry.getQueues().size() == 0) { return new ArrayList>(); } List> queueAttributesList = new ArrayList>(_queueRegistry.getQueues().size()); int attributesLength = attributes.length; for(AMQQueue queue : _queueRegistry.getQueues()) { AMQQueueMBean mbean = (AMQQueueMBean) queue.getManagedObject(); if(mbean == null) { continue; } List attributeValues = new ArrayList(attributesLength); for(int i=0; i < attributesLength; i++) { try { attributeValues.add(mbean.getAttribute(attributes[i])); } catch (Exception e) { attributeValues.add("-"); } } queueAttributesList.add(attributeValues); } return queueAttributesList; } /** * Creates new exchange and registers it with the registry. * * @param exchangeName * @param type * @param durable * @throws JMException * @throws MBeanException */ public void createNewExchange(String exchangeName, String type, boolean durable) throws JMException, MBeanException { CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); try { synchronized (_exchangeRegistry) { Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName)); if (exchange == null) { exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable, false, 0); _exchangeRegistry.registerExchange(exchange); if (durable) { _durableConfig.createExchange(exchange); } } else { throw new JMException("The exchange \"" + exchangeName + "\" already exists."); } } } catch (AMQException ex) { JMException jme = new JMException(ex.toString()); throw new MBeanException(jme, "Error in creating exchange " + exchangeName); } finally { CurrentActor.remove(); } } /** * Unregisters the exchange from registry. * * @param exchangeName * @throws JMException * @throws MBeanException */ public void unregisterExchange(String exchangeName) throws JMException, MBeanException { // TODO // Check if the exchange is in use. // boolean inUse = false; // Check if there are queue-bindings with the exchange and unregister // when there are no bindings. CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); try { _exchangeRegistry.unregisterExchange(new AMQShortString(exchangeName), false); } catch (AMQException ex) { JMException jme = new JMException(ex.toString()); throw new MBeanException(jme, "Error in unregistering exchange " + exchangeName); } finally { CurrentActor.remove(); } } /** * Creates a new queue and registers it with the registry and puts it * in persistance storage if durable queue. * * @param queueName * @param durable * @param owner * @throws JMException * @throws MBeanException */ public void createNewQueue(String queueName, String owner, boolean durable) throws JMException, MBeanException { AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName)); if (queue != null) { throw new JMException("The queue \"" + queueName + "\" already exists."); } CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); try { AMQShortString ownerShortString = null; if (owner != null) { ownerShortString = new AMQShortString(owner); } queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, false, getVirtualHost(), null); if (queue.isDurable() && !queue.isAutoDelete()) { _durableConfig.createQueue(queue); } _queueRegistry.registerQueue(queue); } catch (AMQException ex) { JMException jme = new JMException(ex.toString()); throw new MBeanException(jme, "Error in creating queue " + queueName); } finally { CurrentActor.remove(); } } private VirtualHost getVirtualHost() { return _virtualHostMBean.getVirtualHost(); } /** * Deletes the queue from queue registry and persistant storage. * * @param queueName * @throws JMException * @throws MBeanException */ public void deleteQueue(String queueName) throws JMException, MBeanException { AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName)); if (queue == null) { throw new JMException("The Queue " + queueName + " is not a registered queue."); } CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); try { queue.delete(); if (queue.isDurable()) { _durableConfig.removeQueue(queue); } } catch (AMQException ex) { JMException jme = new JMException(ex.toString()); throw new MBeanException(jme, "Error in deleting queue " + queueName); } finally { CurrentActor.remove(); } } @Override public ManagedObject getParentObject() { return _virtualHostMBean; } // This will have a single instance for a virtual host, so not having the name property in the ObjectName @Override public ObjectName getObjectName() throws MalformedObjectNameException { return getObjectNameForSingleInstanceMBean(); } public void resetStatistics() throws Exception { getVirtualHost().resetStatistics(); } public double getPeakMessageDeliveryRate() { return getVirtualHost().getMessageDeliveryStatistics().getPeak(); } public double getPeakDataDeliveryRate() { return getVirtualHost().getDataDeliveryStatistics().getPeak(); } public double getMessageDeliveryRate() { return getVirtualHost().getMessageDeliveryStatistics().getRate(); } public double getDataDeliveryRate() { return getVirtualHost().getDataDeliveryStatistics().getRate(); } public long getTotalMessagesDelivered() { return getVirtualHost().getMessageDeliveryStatistics().getTotal(); } public long getTotalDataDelivered() { return getVirtualHost().getDataDeliveryStatistics().getTotal(); } public double getPeakMessageReceiptRate() { return getVirtualHost().getMessageReceiptStatistics().getPeak(); } public double getPeakDataReceiptRate() { return getVirtualHost().getDataReceiptStatistics().getPeak(); } public double getMessageReceiptRate() { return getVirtualHost().getMessageReceiptStatistics().getRate(); } public double getDataReceiptRate() { return getVirtualHost().getDataReceiptStatistics().getRate(); } public long getTotalMessagesReceived() { return getVirtualHost().getMessageReceiptStatistics().getTotal(); } public long getTotalDataReceived() { return getVirtualHost().getDataReceiptStatistics().getTotal(); } public boolean isStatisticsEnabled() { return getVirtualHost().isStatisticsEnabled(); } }