diff options
author | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2006-11-27 16:51:44 +0000 |
---|---|---|
committer | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2006-11-27 16:51:44 +0000 |
commit | 306988f998d552a03fa44a639cf743f1bdd5e794 (patch) | |
tree | cd9663e80de72e06f92656ddb03cf9d9ed60e303 /java | |
parent | 9856b26ac19cfe6510b799af707dfdac095c8f59 (diff) | |
download | qpid-python-306988f998d552a03fa44a639cf743f1bdd5e794.tar.gz |
QPID-129
MBeans updated with improved features, like AMQQueue mbean now has separate features for sending message header and message content. ( other details are in JIRA)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@479686 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
11 files changed, 403 insertions, 586 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index a1dabcd964..3470a5bb08 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -20,25 +20,13 @@ */ package org.apache.qpid.server; -import org.apache.qpid.framing.ProtocolVersionList; -import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; -import org.apache.qpid.server.protocol.AMQPProtocolProvider; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.transport.ConnectorConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.management.*; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.AMQException; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.commons.cli.*; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Logger; @@ -48,20 +36,40 @@ import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.management.ManagedBroker; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; +import org.apache.qpid.server.protocol.AMQPProtocolProvider; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.transport.ConnectorConfiguration; +import org.apache.qpid.url.URLSyntaxException; import javax.management.JMException; import javax.management.MBeanException; -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; import javax.management.MalformedObjectNameException; - +import javax.management.ObjectName; import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.StringTokenizer; import java.util.Collection; import java.util.List; +import java.util.StringTokenizer; /** * Main entry point for AMQPD. @@ -439,9 +447,9 @@ public class Main implements ProtocolVersionList { new AMQBrokerManager().register(); } - catch (NotCompliantMBeanException ex) + catch (JMException ex) { - throw new AMQException("Exception occured in creating AMQBrokerManager MBean."); + throw new AMQException("Exception occured in creating AMQBrokerManager MBean"); } } @@ -450,8 +458,7 @@ public class Main implements ProtocolVersionList * Broker level management features like creating and deleting exchanges and queue. */ @MBeanDescription("This MBean exposes the broker level management features") - private final class AMQBrokerManager extends AMQManagedObject - implements ManagedBroker + private final class AMQBrokerManager extends AMQManagedObject implements ManagedBroker { private final QueueRegistry _queueRegistry; private final ExchangeRegistry _exchangeRegistry; @@ -459,10 +466,9 @@ public class Main implements ProtocolVersionList private final MessageStore _messageStore; @MBeanConstructor("Creates the Broker Manager MBean") - protected AMQBrokerManager() throws NotCompliantMBeanException + protected AMQBrokerManager() throws JMException { super(ManagedBroker.class, ManagedBroker.TYPE); - IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); _queueRegistry = appRegistry.getQueueRegistry(); _exchangeRegistry = appRegistry.getExchangeRegistry(); @@ -483,10 +489,7 @@ public class Main implements ProtocolVersionList * @param autoDelete * @throws JMException */ - public void createNewExchange(String exchangeName, - String type, - boolean durable, - boolean autoDelete) + public void createNewExchange(String exchangeName, String type, boolean durable, boolean autoDelete) throws JMException { try @@ -494,14 +497,9 @@ public class Main implements ProtocolVersionList synchronized(_exchangeRegistry) { Exchange exchange = _exchangeRegistry.getExchange(exchangeName); - if (exchange == null) { - exchange = _exchangeFactory.createExchange(exchangeName, - type, //eg direct - durable, - autoDelete, - 0); //ticket no + exchange = _exchangeFactory.createExchange(exchangeName, type, durable, autoDelete, 0); _exchangeRegistry.registerExchange(exchange); } else @@ -522,8 +520,7 @@ public class Main implements ProtocolVersionList * @param exchangeName * @throws JMException */ - public void unregisterExchange(String exchangeName) - throws JMException + public void unregisterExchange(String exchangeName) throws JMException { boolean inUse = false; // TODO @@ -550,33 +547,28 @@ public class Main implements ProtocolVersionList * @param autoDelete * @throws JMException */ - public void createQueue(String queueName, - boolean durable, - String owner, - boolean autoDelete) + public void createNewQueue(String queueName, boolean durable, String owner, boolean autoDelete) throws JMException { AMQQueue queue = _queueRegistry.getQueue(queueName); - if (queue == null) + if (queue != null) { - try - { - queue = new AMQQueue(queueName, durable, owner, autoDelete, _queueRegistry); - if (queue.isDurable() && !queue.isAutoDelete()) - { - _messageStore.createQueue(queue); - } - _queueRegistry.registerQueue(queue); - } - catch (AMQException ex) + throw new JMException("The queue \"" + queueName + "\" already exists."); + } + + try + { + queue = new AMQQueue(queueName, durable, owner, autoDelete, _queueRegistry); + if (queue.isDurable() && !queue.isAutoDelete()) { - _logger.error("Error in creating queue " + queueName, ex); - throw new MBeanException(ex, ex.toString()); + _messageStore.createQueue(queue); } + _queueRegistry.registerQueue(queue); } - else + catch (AMQException ex) { - throw new JMException("The queue \"" + queueName + "\" already exists."); + _logger.error("Error in creating queue " + queueName, ex); + throw new MBeanException(ex, ex.toString()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 69f5e862d6..d5ca567308 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -25,14 +25,16 @@ import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; +import javax.management.MalformedObjectNameException; import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; public abstract class AbstractExchange implements Exchange, Managable { private String _name; protected boolean _durable; - + protected String _exchangeType; protected int _ticket; protected ExchangeMBean _exchangeMbean; @@ -64,6 +66,11 @@ public abstract class AbstractExchange implements Exchange, Managable return _name; } + public String getExchangeType() + { + return _exchangeType; + } + public Integer getTicketNo() { return _ticket; @@ -79,6 +86,13 @@ public abstract class AbstractExchange implements Exchange, Managable return _autoDelete; } + public ObjectName getObjectName() throws MalformedObjectNameException + { + String objNameString = super.getObjectName().toString(); + objNameString = objNameString + ",ExchangeType=" + _exchangeType; + return new ObjectName(objNameString); + } + } // End of MBean class public String getName() diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index 085d0aad19..6dc97f9e48 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -24,15 +24,14 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import javax.management.JMException; import javax.management.MBeanException; -import javax.management.NotCompliantMBeanException; import javax.management.openmbean.*; import java.util.ArrayList; import java.util.List; @@ -53,53 +52,36 @@ public class DestNameExchange extends AbstractExchange @MBeanDescription("Management Bean for Direct Exchange") private final class DestNameExchangeMBean extends ExchangeMBean { - private String[] _bindingItemNames = {"BindingKey", "QueueNames"}; - private String[] _bindingItemDescriptions = {"Binding key", "Queue Names"}; - private String[] _bindingItemIndexNames = {"BindingKey"}; + // open mbean data types for representing exchange bindings + private String[] _bindingItemNames = {"Routing Key", "Queue Names"}; + private String[] _bindingItemIndexNames = {_bindingItemNames[0]}; private OpenType[] _bindingItemTypes = new OpenType[2]; - private CompositeType _bindingDataType = null; private TabularType _bindinglistDataType = null; private TabularDataSupport _bindingList = null; @MBeanConstructor("Creates an MBean for AMQ direct exchange") - public DestNameExchangeMBean() throws NotCompliantMBeanException + public DestNameExchangeMBean() throws JMException { super(); + _exchangeType = "direct"; init(); } /** * initialises the OpenType objects. */ - private void init() + private void init() throws OpenDataException { - try - { - _bindingItemTypes[0] = SimpleType.STRING; - //_bindingItemTypes[1] = ArrayType.getArrayType(SimpleType.STRING); - _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING); - - _bindingDataType = new CompositeType("QueueBinding", - "Binding key and bound Queue names", - _bindingItemNames, - _bindingItemDescriptions, - _bindingItemTypes); - _bindinglistDataType = new TabularType("Bindings", - "List of queue bindings for " + getName() , - _bindingDataType, - _bindingItemIndexNames); - } - catch(OpenDataException ex) - { - //It should never occur. - _logger.error("OpenDataTypes could not be created.", ex); - throw new RuntimeException(ex); - } + _bindingItemTypes[0] = SimpleType.STRING; + _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING); + _bindingDataType = new CompositeType("Exchange Binding", "Routing key and Queue names", + _bindingItemNames, _bindingItemNames, _bindingItemTypes); + _bindinglistDataType = new TabularType("Exchange Bindings", "Exchange Bindings for " + getName(), + _bindingDataType, _bindingItemIndexNames); } - public TabularData viewBindings() - throws OpenDataException + public TabularData bindings() throws OpenDataException { Map<String, List<AMQQueue>> bindings = _index.getBindingsMap(); _bindingList = new TabularDataSupport(_bindinglistDataType); @@ -116,20 +98,16 @@ public class DestNameExchange extends AbstractExchange } Object[] bindingItemValues = {key, queueList.toArray(new String[0])}; - CompositeData bindingData = new CompositeDataSupport(_bindingDataType, - _bindingItemNames, - bindingItemValues); + CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); _bindingList.put(bindingData); } return _bindingList; } - public void createBinding(String queueName, String binding) - throws JMException + public void createNewBinding(String queueName, String binding) throws JMException { AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName); - if (queue == null) { throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); @@ -155,10 +133,10 @@ public class DestNameExchange extends AbstractExchange { return new DestNameExchangeMBean(); } - catch (NotCompliantMBeanException ex) + catch (JMException ex) { - _logger.error("Exception occured in creating the DestNameExchenge", ex); - throw new AMQException("Exception occured in creating the DestNameExchenge", ex); + _logger.error("Exception occured in creating the direct exchange mbean", ex); + throw new AMQException("Exception occured in creating the direct exchange mbean", ex); } } @@ -172,8 +150,7 @@ public class DestNameExchange extends AbstractExchange } else { - _logger.debug("Binding queue " + queue + " with routing key " + routingKey - + " to exchange " + this); + _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index f6abd53076..a692f9ebca 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -22,18 +22,17 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.management.MBeanConstructor; -import javax.management.openmbean.*; import javax.management.JMException; import javax.management.MBeanException; -import javax.management.NotCompliantMBeanException; +import javax.management.openmbean.*; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -53,55 +52,41 @@ public class DestWildExchange extends AbstractExchange @MBeanDescription("Management Bean for Topic Exchange") private final class DestWildExchangeMBean extends ExchangeMBean { - private String[] _bindingItemNames = {"BindingKey", "QueueNames"}; - private String[] _bindingItemDescriptions = {"Binding key", "Queue Names"}; - private String[] _bindingItemIndexNames = {"BindingKey"}; + // open mbean data types for representing exchange bindings + private String[] _bindingItemNames = {"Routing Key", "Queue Names"}; + private String[] _bindingItemIndexNames = {_bindingItemNames[0]}; private OpenType[] _bindingItemTypes = new OpenType[2]; - private CompositeType _bindingDataType = null; private TabularType _bindinglistDataType = null; private TabularDataSupport _bindingList = null; @MBeanConstructor("Creates an MBean for AMQ topic exchange") - public DestWildExchangeMBean() throws NotCompliantMBeanException + public DestWildExchangeMBean() throws JMException { super(); + _exchangeType = "topic"; init(); } /** * initialises the OpenType objects. */ - private void init() + private void init() throws OpenDataException { - try - { - _bindingItemTypes[0] = SimpleType.STRING; - _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING); - - _bindingDataType = new CompositeType("QueueBinding", - "Binding key and bound Queue names", - _bindingItemNames, - _bindingItemDescriptions, - _bindingItemTypes); - _bindinglistDataType = new TabularType("Bindings", - "List of queue bindings for " + getName(), - _bindingDataType, - _bindingItemIndexNames); - } - catch(OpenDataException ex) - { - //It should never occur. - _logger.error("OpenDataTypes could not be created.", ex); - throw new RuntimeException(ex); - } + _bindingItemTypes[0] = SimpleType.STRING; + _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING); + _bindingDataType = new CompositeType("Exchange Binding", "Routing key and Queue names", + _bindingItemNames, _bindingItemNames, _bindingItemTypes); + _bindinglistDataType = new TabularType("Exchange Bindings", "Exchange Bindings for " + getName(), + _bindingDataType, _bindingItemIndexNames); } - public TabularData viewBindings() - throws OpenDataException + /** + * returns exchange bindings in tabular form + */ + public TabularData bindings() throws OpenDataException { _bindingList = new TabularDataSupport(_bindinglistDataType); - for (Map.Entry<String, List<AMQQueue>> entry : _routingKey2queues.entrySet()) { String key = entry.getKey(); @@ -114,20 +99,16 @@ public class DestWildExchange extends AbstractExchange } Object[] bindingItemValues = {key, queueList.toArray(new String[0])}; - CompositeData bindingData = new CompositeDataSupport(_bindingDataType, - _bindingItemNames, - bindingItemValues); + CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); _bindingList.put(bindingData); } return _bindingList; } - public void createBinding(String queueName, String binding) - throws JMException + public void createNewBinding(String queueName, String binding) throws JMException { AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName); - if (queue == null) throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); @@ -217,10 +198,10 @@ public class DestWildExchange extends AbstractExchange { return new DestWildExchangeMBean(); } - catch (NotCompliantMBeanException ex) + catch (JMException ex) { - _logger.error("Exception occured in creating the DestWildExchenge", ex); - throw new AMQException("Exception occured in creating the DestWildExchenge", ex); + _logger.error("Exception occured in creating the topic exchenge mbean", ex); + throw new AMQException("Exception occured in creating the topic exchenge mbean", ex); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 961d4ddf4c..586d6b8796 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -32,7 +32,6 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import javax.management.JMException; -import javax.management.NotCompliantMBeanException; import javax.management.openmbean.*; import java.util.ArrayList; import java.util.Iterator; @@ -80,53 +79,39 @@ public class HeadersExchange extends AbstractExchange @MBeanDescription("Management Bean for Headers Exchange") private final class HeadersExchangeMBean extends ExchangeMBean { - private String[] _bindingItemNames = {"Queue", "HeaderBinding"}; - private String[] _bindingItemDescriptions = {"Queue Name", "Header attribute bindings"}; - private String[] _bindingItemIndexNames = {"HeaderBinding"}; - private OpenType[] _bindingItemTypes = new OpenType[2]; - + // open mbean data types for representing exchange bindings + private String[] _bindingItemNames = {"S.No.", "Queue Name", "Header Bindings"}; + private String[] _bindingItemIndexNames = {_bindingItemNames[0]}; + private OpenType[] _bindingItemTypes = new OpenType[3]; private CompositeType _bindingDataType = null; private TabularType _bindinglistDataType = null; private TabularDataSupport _bindingList = null; @MBeanConstructor("Creates an MBean for AMQ Headers exchange") - public HeadersExchangeMBean() throws NotCompliantMBeanException + public HeadersExchangeMBean() throws JMException { super(); + _exchangeType = "headers"; init(); } /** * initialises the OpenType objects. */ - private void init() + private void init() throws OpenDataException { - try - { - _bindingItemTypes[0] = SimpleType.STRING; - _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING); - - _bindingDataType = new CompositeType("QueueAndHeaderAttributesBinding", - "Queue name and header bindings", - _bindingItemNames, - _bindingItemDescriptions, - _bindingItemTypes); - _bindinglistDataType = new TabularType("HeaderBindings", - "List of queue bindings for " + getName(), - _bindingDataType, - _bindingItemIndexNames); - } - catch(OpenDataException ex) - { - //It should never occur. - _logger.error("OpenDataTypes could not be created.", ex); - throw new RuntimeException(ex); - } + _bindingItemTypes[0] = SimpleType.INTEGER; + _bindingItemTypes[1] = SimpleType.STRING; + _bindingItemTypes[2] = new ArrayType(1, SimpleType.STRING); + _bindingDataType = new CompositeType("Exchange Binding", "Queue name and header bindings", + _bindingItemNames, _bindingItemNames, _bindingItemTypes); + _bindinglistDataType = new TabularType("Exchange Bindings", "List of exchange bindings for " + getName(), + _bindingDataType, _bindingItemIndexNames); } - public TabularData viewBindings() - throws OpenDataException + public TabularData bindings() throws OpenDataException { _bindingList = new TabularDataSupport(_bindinglistDataType); + int count = 1; for (Iterator<Registration> itr = _bindings.iterator(); itr.hasNext();) { Registration registration = itr.next(); @@ -144,10 +129,8 @@ public class HeadersExchange extends AbstractExchange mappingList.add(key + "=" + value); } - Object[] bindingItemValues = {queueName, mappingList.toArray(new String[0])}; - CompositeData bindingData = new CompositeDataSupport(_bindingDataType, - _bindingItemNames, - bindingItemValues); + Object[] bindingItemValues = {count++, queueName, mappingList.toArray(new String[0])}; + CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); _bindingList.put(bindingData); } @@ -161,8 +144,7 @@ public class HeadersExchange extends AbstractExchange * @param binding * @throws JMException */ - public void createBinding(String queueName, String binding) - throws JMException + public void createNewBinding(String queueName, String binding) throws JMException { AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName); @@ -240,7 +222,7 @@ public class HeadersExchange extends AbstractExchange { return new HeadersExchangeMBean(); } - catch (NotCompliantMBeanException ex) + catch (JMException ex) { _logger.error("Exception occured in creating the HeadersExchangeMBean", ex); throw new AMQException("Exception occured in creating the HeadersExchangeMBean", ex); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java index 3c2c105186..b61aa233f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.server.management.MBeanAttribute; import org.apache.qpid.server.management.MBeanOperation; import org.apache.qpid.server.management.MBeanOperationParameter; +import org.apache.qpid.server.queue.ManagedQueue; import javax.management.openmbean.TabularData; import javax.management.JMException; @@ -44,9 +45,12 @@ public interface ManagedExchange * @return the name of the exchange. * @throws IOException */ - @MBeanAttribute(name="Name", description="Name of exchange") + @MBeanAttribute(name="Name", description=TYPE + " Name") String getName() throws IOException; + @MBeanAttribute(name="ExchangeType", description="Exchange Type") + String getExchangeType() throws IOException; + @MBeanAttribute(name="TicketNo", description="Exchange Ticket No") Integer getTicketNo() throws IOException; @@ -74,8 +78,8 @@ public interface ManagedExchange * @throws IOException * @throws JMException */ - @MBeanOperation(name="viewBindings", description="view the queue bindings for this exchange") - TabularData viewBindings() throws IOException, JMException; + @MBeanOperation(name="bindings", description="view the queue bindings for this exchange") + TabularData bindings() throws IOException, JMException; /** * Creates new binding with the given queue and binding. @@ -83,11 +87,11 @@ public interface ManagedExchange * @param binding * @throws JMException */ - @MBeanOperation(name="createBinding", - description="create a new binding with this exchange", - impact= MBeanOperationInfo.ACTION) - void createBinding(@MBeanOperationParameter(name="queue name", description="queue name") String queueName, - @MBeanOperationParameter(name="binding", description="queue binding")String binding) + @MBeanOperation(name="createNewBinding", + description="create a new binding with this exchange", + impact= MBeanOperationInfo.ACTION) + void createNewBinding(@MBeanOperationParameter(name= ManagedQueue.TYPE, description="Queue name") String queueName, + @MBeanOperationParameter(name="Binding", description="New binding")String binding) throws JMException; }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java b/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java index 266fb62fd8..aec7d6cb73 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java +++ b/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.management; +import org.apache.qpid.server.exchange.ManagedExchange; +import org.apache.qpid.server.queue.ManagedQueue; + import javax.management.JMException; import javax.management.MBeanOperationInfo; import java.io.IOException; @@ -45,10 +48,9 @@ public interface ManagedBroker * @throws IOException * @throws JMException */ - @MBeanOperation(name="createNewExchange", description="Creates a new Exchange", - impact= MBeanOperationInfo.ACTION) + @MBeanOperation(name="createNewExchange", description="Creates a new Exchange", impact= MBeanOperationInfo.ACTION) void createNewExchange(@MBeanOperationParameter(name="name", description="Name of the new exchange")String name, - @MBeanOperationParameter(name="excahnge type", description="Type of the exchange")String type, + @MBeanOperationParameter(name="ExchangeType", description="Type of the exchange")String type, @MBeanOperationParameter(name="durable", description="true if the Exchang should be durable")boolean durable, @MBeanOperationParameter(name="passive", description="true of the Exchange should be passive")boolean passive) throws IOException, JMException; @@ -61,9 +63,9 @@ public interface ManagedBroker * @throws JMException */ @MBeanOperation(name="unregisterExchange", - description="Unregisters all the related channels and queuebindings of this exchange", - impact= MBeanOperationInfo.ACTION) - void unregisterExchange(@MBeanOperationParameter(name="exchange name", description="Name of the exchange")String exchange) + description="Unregisters all the related channels and queuebindings of this exchange", + impact= MBeanOperationInfo.ACTION) + void unregisterExchange(@MBeanOperationParameter(name= ManagedExchange.TYPE, description="Exchange Name")String exchange) throws IOException, JMException; /** @@ -75,12 +77,11 @@ public interface ManagedBroker * @throws IOException * @throws JMException */ - @MBeanOperation(name="createQueue", description="Create a new Queue on the Broker server", - impact= MBeanOperationInfo.ACTION) - void createQueue(@MBeanOperationParameter(name="queue name", description="Name of the new queue")String queueName, - @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable, - @MBeanOperationParameter(name="owner", description="Owner name")String owner, - @MBeanOperationParameter(name="autoDelete", description="true if the queue should be auto delete") boolean autoDelete) + @MBeanOperation(name="createNewQueue", description="Create a new Queue on the Broker server", impact= MBeanOperationInfo.ACTION) + void createNewQueue(@MBeanOperationParameter(name="queue name", description="Name of the new queue")String queueName, + @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable, + @MBeanOperationParameter(name="owner", description="Owner name")String owner, + @MBeanOperationParameter(name="autoDelete", description="true if the queue should be auto delete") boolean autoDelete) throws IOException, JMException; /** @@ -93,6 +94,6 @@ public interface ManagedBroker @MBeanOperation(name="deleteQueue", description="Unregisters the Queue bindings, removes the subscriptions and deletes the queue", impact= MBeanOperationInfo.ACTION) - void deleteQueue(@MBeanOperationParameter(name="queue name", description="Name of the queue")String queueName) + void deleteQueue(@MBeanOperationParameter(name= ManagedQueue.TYPE, description="Queue Name")String queueName) throws IOException, JMException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 966af77d64..be8d2c4c82 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -52,7 +52,6 @@ import org.apache.qpid.server.state.AMQStateManager; import javax.management.JMException; import javax.management.MBeanException; import javax.management.MBeanNotificationInfo; -import javax.management.NotCompliantMBeanException; import javax.management.Notification; import javax.management.monitor.MonitorNotification; import javax.management.openmbean.CompositeData; @@ -122,66 +121,35 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, * augment the ManagedConnection interface and add the appropriate implementation here. */ @MBeanDescription("Management Bean for an AMQ Broker Connection") - private final class ManagedAMQProtocolSession extends AMQManagedObject - implements ManagedConnection + private final class ManagedAMQProtocolSession extends AMQManagedObject implements ManagedConnection { private String _name = null; - /** - * Represents the channel attributes sent with channel data. - */ - private String[] _channelAtttibuteNames = { "ChannelId", - "Transactional", - "DefaultQueue", - "UnacknowledgedMessageCount"}; - private String[] _channelAttributeDescriptions = { "Channel Identifier", - "is Channel Transactional?", - "Default Queue Name", - "Unacknowledged Message Count"}; - private OpenType[] _channelAttributeTypes = { SimpleType.INTEGER, - SimpleType.BOOLEAN, - SimpleType.STRING, - SimpleType.INTEGER}; - - private String[] _indexNames = { "ChannelId" }; //Channels in the list will be indexed according to channelId. - private CompositeType _channelType = null; // represents the data type for channel data - private TabularType _channelsType = null; // Datatype for list of channelsType + //openmbean data types for representing the channel attributes + private String[] _channelAtttibuteNames = { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"}; + private String[] _indexNames = {_channelAtttibuteNames[0]}; + private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER}; + private CompositeType _channelType = null; // represents the data type for channel data + private TabularType _channelsType = null; // Data type for list of channels type private TabularDataSupport _channelsList = null; @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") - public ManagedAMQProtocolSession() throws NotCompliantMBeanException + public ManagedAMQProtocolSession() throws JMException { super(ManagedConnection.class, ManagedConnection.TYPE); init(); } /** - * initialises the CompositeTypes and TabularType attributes. + * initialises the openmbean data types */ - private void init() + private void init() throws OpenDataException { String remote = getRemoteAddress(); remote = "anonymous".equals(remote) ? remote + hashCode() : remote; _name = jmxEncode(new StringBuffer(remote), 0).toString(); - - try - { - _channelType = new CompositeType("channel", - "Channel Details", - _channelAtttibuteNames, - _channelAttributeDescriptions, - _channelAttributeTypes); - - _channelsType = new TabularType("channelsType", - "List of available channels", - _channelType, - _indexNames); - } - catch(OpenDataException ex) - { - // It should never occur. - _logger.error("OpenDataTypes could not be created.", ex); - throw new RuntimeException(ex); - } + _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, + _channelAtttibuteNames, _channelAttributeTypes); + _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames); } public Date getLastIoTime() @@ -204,12 +172,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return _minaProtocolSession.getReadBytes(); } - public Long getMaximumNumberOfAllowedChannels() + public Long getMaximumNumberOfChannels() { return _maxNoOfChannels; } - public void setMaximumNumberOfAllowedChannels(Long value) + public void setMaximumNumberOfChannels(Long value) { _maxNoOfChannels = value; } @@ -264,30 +232,25 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, * @return list of channels in tabular form. * @throws OpenDataException */ - public TabularData getChannels() throws OpenDataException + public TabularData channels() throws OpenDataException { _channelsList = new TabularDataSupport(_channelsType); for (Map.Entry<Integer, AMQChannel> entry : _channelMap.entrySet()) { AMQChannel channel = entry.getValue(); - Object[] itemValues = {channel.getChannelId(), - channel.isTransactional(), + Object[] itemValues = {channel.getChannelId(), channel.isTransactional(), (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null, channel.getUnacknowledgedMessageMap().size()}; - CompositeData channelData = new CompositeDataSupport(_channelType, - _channelAtttibuteNames, - itemValues); - + CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues); _channelsList.put(channelData); } return _channelsList; } - - public void closeChannel(int id) - throws Exception + + public void closeChannel(int id) throws Exception { try { @@ -299,8 +262,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - public void closeConnection() - throws Exception + public void closeConnection() throws Exception { try { @@ -315,13 +277,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, @Override public MBeanNotificationInfo[] getNotificationInfo() { - String[] notificationTypes = new String[] - {MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; + String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; String name = MonitorNotification.class.getName(); - String description = "An attribute of this MBean has reached threshold value"; - MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, - name, - description); + String description = "Channel count has reached threshold value"; + MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); return new MBeanNotificationInfo[] {info1}; } @@ -329,14 +288,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private void checkForNotification() { int channelsCount = _channelMap.size(); - if (channelsCount >= getMaximumNumberOfAllowedChannels()) + if (channelsCount >= getMaximumNumberOfChannels()) { - Notification n = new Notification( - MonitorNotification.THRESHOLD_VALUE_EXCEEDED, - this, - ++_notificationSequenceNumber, - System.currentTimeMillis(), - "ChannelsCount = " + channelsCount + ", ChannelsCount has reached the threshold value"); + Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, + ++_notificationSequenceNumber, System.currentTimeMillis(), + "Channel count (" + channelsCount + ") has reached the threshold value"); _broadcaster.sendNotification(n); } @@ -372,10 +328,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { return new ManagedAMQProtocolSession(); } - catch(NotCompliantMBeanException ex) + catch(JMException ex) { - _logger.error("AMQProtocolSession MBean creation has failed.", ex); - throw new AMQException("AMQProtocolSession MBean creation has failed.", ex); + _logger.error("AMQProtocolSession MBean creation has failed ", ex); + throw new AMQException("AMQProtocolSession MBean creation has failed ", ex); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java index 889acd0142..2f3102b048 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java @@ -41,83 +41,57 @@ public interface ManagedConnection static final String TYPE = "Connection"; /** - * channel details of all the channels opened for this connection. - * @return general channel details - * @throws IOException - * @throws JMException + * Tells the remote address of this connection. + * @return remote address */ - @MBeanAttribute(name="Channels", - description="channel details of all the channels opened for this connection") - TabularData getChannels() throws IOException, JMException; + @MBeanAttribute(name="RemoteAddress", description=TYPE + " Address") + String getRemoteAddress(); /** * Tells the last time, the IO operation was done. * @return last IO time. */ - @MBeanAttribute(name="LastIOTime", - description="The last time, the IO operation was done") + @MBeanAttribute(name="LastIOTime", description="The last time, the IO operation was done") Date getLastIoTime(); /** - * Tells the remote address of this connection. - * @return remote address - */ - @MBeanAttribute(name="RemoteAddress", - description="The remote address of this connection") - String getRemoteAddress(); - - /** * Tells the total number of bytes written till now. * @return number of bytes written. */ - @MBeanAttribute(name="WrittenBytes", - description="The total number of bytes written till now") + @MBeanAttribute(name="WrittenBytes", description="The total number of bytes written till now") Long getWrittenBytes(); /** * Tells the total number of bytes read till now. * @return number of bytes read. */ - @MBeanAttribute(name="ReadBytes", - description="The total number of bytes read till now") + @MBeanAttribute(name="ReadBytes", description="The total number of bytes read till now") Long getReadBytes(); /** - * Tells the maximum number of channels that can be opened using - * this connection. This is useful in setting notifications or + * Threshold high value for no of channels. This is useful in setting notifications or * taking required action is there are more channels being created. - * @return maximum number of channels allowed to be created. + * @return threshold limit for no of channels */ - Long getMaximumNumberOfAllowedChannels(); + Long getMaximumNumberOfChannels(); /** - * Sets the maximum number of channels allowed to be created using - * this connection. + * Sets the threshold high value for number of channels for a connection * @param value */ - @MBeanAttribute(name="MaximumNumberOfAllowedChannels", - description="The maximum number of channels that can be opened using this connection") - void setMaximumNumberOfAllowedChannels(Long value); + @MBeanAttribute(name="MaximumNumberOfChannels", description="The threshold high value for number of channels for this connection") + void setMaximumNumberOfChannels(Long value); //********** Operations *****************// /** - * Closes all the related channels and unregisters this connection from managed objects. - */ - @MBeanOperation(name="closeConnection", - description="Closes this connection and all related channels", - impact= MBeanOperationInfo.ACTION) - void closeConnection() throws Exception; - - /** - * Unsubscribes the consumers and unregisters the channel from managed objects. + * channel details of all the channels opened for this connection. + * @return general channel details + * @throws IOException + * @throws JMException */ - @MBeanOperation(name="closeChannel", - description="Closes the channel with given channeld and" + - "connected consumers will be unsubscribed", - impact= MBeanOperationInfo.ACTION) - void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) - throws Exception; + @MBeanOperation(name="channels", description="Channel details for this connection") + TabularData channels() throws IOException, JMException; /** * Commits the transactions if the channel is transactional. @@ -125,8 +99,8 @@ public interface ManagedConnection * @throws JMException */ @MBeanOperation(name="commitTransaction", - description="Commits the transactions for given channelID, if the channel is transactional", - impact= MBeanOperationInfo.ACTION) + description="Commits the transactions for given channel Id, if the channel is transactional", + impact= MBeanOperationInfo.ACTION) void commitTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException; /** @@ -135,7 +109,24 @@ public interface ManagedConnection * @throws JMException */ @MBeanOperation(name="rollbackTransactions", - description="Rollsback the transactions for given channelId, if the channel is transactional", - impact= MBeanOperationInfo.ACTION) + description="Rollsback the transactions for given channel Id, if the channel is transactional", + impact= MBeanOperationInfo.ACTION) void rollbackTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException; + + /** + * Unsubscribes the consumers and unregisters the channel from managed objects. + */ + @MBeanOperation(name="closeChannel", + description="Closes the channel with given channel Id and connected consumers will be unsubscribed", + impact= MBeanOperationInfo.ACTION) + void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) + throws Exception; + + /** + * Closes all the related channels and unregisters this connection from managed objects. + */ + @MBeanOperation(name="closeConnection", + description="Closes this connection and all related channels", + impact= MBeanOperationInfo.ACTION) + void closeConnection() throws Exception; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 05fe8908c3..353a2007c0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -25,6 +25,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; @@ -38,7 +39,6 @@ import org.apache.qpid.server.txn.TxnOp; import javax.management.JMException; import javax.management.MBeanException; import javax.management.MBeanNotificationInfo; -import javax.management.NotCompliantMBeanException; import javax.management.Notification; import javax.management.monitor.MonitorNotification; import javax.management.openmbean.*; @@ -100,19 +100,19 @@ public class AMQQueue implements Managable, Comparable private final AMQQueueMBean _managedObject; /** - * max allowed size of a single message(in KBytes). + * max allowed size(KB) of a single message */ - private long _maxAllowedMessageSize = 10000; // 10 MB + private long _maxMessageSize = 10000; /** * max allowed number of messages on a queue. */ - private Integer _maxAllowedMessageCount = 10000; + private Integer _maxMessageCount = 10000; /** - * max allowed size in KBytes for all the messages combined together in a queue. + * max queue depth(KB) for the queue */ - private long _queueDepth = 10000000; // 10 GB + private long _maxQueueDepth = 10000000; /** * total messages received by the queue since startup. @@ -132,83 +132,45 @@ public class AMQQueue implements Managable, Comparable private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue { private String _queueName = null; + // OpenMBean data types for viewMessages method + private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"}; + private String[] _msgAttributeIndex = {_msgAttributeNames[0]}; + private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types. + private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. + private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. - // AMQ message attribute names - private String[] _msgAttributeNames = {"MessageId", - "Header", - "Size", - "Redelivered" - }; - // AMQ Message attribute descriptions. - private String[] _msgAttributeDescriptions = {"Message Id", - "Header", - "Message size in bytes", - "Redelivered" - }; - - private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types. - private String[] _msgAttributeIndex = {"MessageId"}; // Messages will be indexed according to the messageId. - private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. - private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. - - - private CompositeType _msgContentType = null; // For message content - private String[] _msgContentAttributes = {"MessageId", - "MimeType", - "Encoding", - "Content" - }; - private String[] _msgContentDescriptions = {"Message Id", - "MimeType", - "Encoding", - "Message content" - }; - private OpenType[] _msgContentAttributeTypes = new OpenType[4]; - - - @MBeanConstructor("Creates an MBean exposing an AMQQueue.") - public AMQQueueMBean() throws NotCompliantMBeanException + // OpenMBean data types for viewMessageContent method + private CompositeType _msgContentType = null; + private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"}; + private OpenType[] _msgContentAttributeTypes = new OpenType[4]; + + @MBeanConstructor("Creates an MBean exposing an AMQQueue") + public AMQQueueMBean() throws JMException { super(ManagedQueue.class, ManagedQueue.TYPE); init(); } - private void init() + /** + * initialises the openmbean data types + */ + private void init() throws OpenDataException { _queueName = jmxEncode(new StringBuffer(_name), 0).toString(); - try - { - _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id - _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType - _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding - _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content - _msgContentType = new CompositeType("MessageContent", - "AMQ Message Content", - _msgContentAttributes, - _msgContentDescriptions, - _msgContentAttributeTypes); - - - _msgAttributeTypes[0] = SimpleType.LONG; // For message id - _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes - _msgAttributeTypes[2] = SimpleType.LONG; // For size - _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered - - _messageDataType = new CompositeType("Message", - "AMQ Message", - _msgAttributeNames, - _msgAttributeDescriptions, - _msgAttributeTypes); - _messagelistDataType = new TabularType("Messages", - "List of messages", - _messageDataType, - _msgAttributeIndex); - } - catch (OpenDataException ex) - { - _logger.error("OpenDataTypes could not be created.", ex); - throw new RuntimeException(ex); - } + _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id + _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType + _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding + _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content + _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, + _msgContentAttributes, _msgContentAttributeTypes); + + _msgAttributeTypes[0] = SimpleType.LONG; // For message id + _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes + _msgAttributeTypes[2] = SimpleType.LONG; // For size + _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered + + _messageDataType = new CompositeType("Message","AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes); + _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex); } public String getObjectInstanceName() @@ -243,12 +205,12 @@ public class AMQQueue implements Managable, Comparable public Long getMaximumMessageSize() { - return _maxAllowedMessageSize; + return _maxMessageSize; } public void setMaximumMessageSize(Long value) { - _maxAllowedMessageSize = value; + _maxMessageSize = value; } public Integer getConsumerCount() @@ -268,27 +230,29 @@ public class AMQQueue implements Managable, Comparable public Integer getMaximumMessageCount() { - return _maxAllowedMessageCount; + return _maxMessageCount; } public void setMaximumMessageCount(Integer value) { - _maxAllowedMessageCount = value; + _maxMessageCount = value; } - public Long getQueueDepth() + public Long getMaximumQueueDepth() { - return _queueDepth; + return _maxQueueDepth; } // Sets the queue depth, the max queue size - public void setQueueDepth(Long value) + public void setMaximumQueueDepth(Long value) { - _queueDepth = value; + _maxQueueDepth = value; } - // Returns the size of messages in the queue - public Long getQueueSize() + /** + * returns the size of messages(KB) in the queue. + */ + public Long getQueueDepth() { List<AMQMessage> list = _deliveryMgr.getMessages(); if (list.size() == 0) @@ -296,15 +260,17 @@ public class AMQQueue implements Managable, Comparable return 0l; } - long queueSize = 0; + long queueDepth = 0; for (AMQMessage message : list) { - queueSize = queueSize + getMessageSize(message); + queueDepth = queueDepth + getMessageSize(message); } - return new Long(Math.round(queueSize / 100)); + return (long)Math.round(queueDepth / 1000); } - // calculates the size of an AMQMessage + /** + * returns size of message in bytes + */ private long getMessageSize(AMQMessage msg) { if (msg == null) @@ -312,53 +278,43 @@ public class AMQQueue implements Managable, Comparable return 0l; } - List<ContentBody> cBodies = msg.getContentBodies(); - long messageSize = 0; - for (ContentBody body : cBodies) - { - if (body != null) - { - messageSize = messageSize + body.getSize(); - } - } - return messageSize; + return msg.getContentHeaderBody().bodySize; } - // Checks if there is any notification to be send to the listeners + /** + * Checks if there is any notification to be send to the listeners + */ private void checkForNotification(AMQMessage msg) { - // Check for message count + // Check for threshold message count Integer msgCount = getMessageCount(); if (msgCount >= getMaximumMessageCount()) { - notifyClients("MessageCount = " + msgCount + ", Queue has reached its size limit and is now full."); + notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value"); } - // Check for received message size + // Check for threshold message size long messageSize = getMessageSize(msg); - if (messageSize >= getMaximumMessageSize()) + if (messageSize >= _maxMessageSize) { - notifyClients("MessageSize = " + messageSize + ", Message size (MessageID=" + msg.getMessageId() + - ")is higher than the threshold value"); + notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value"); } - // Check for queue size in bytes - long queueSize = getQueueSize(); - if (queueSize >= getQueueDepth()) + // Check for threshold queue depth in bytes + long queueDepth = getQueueDepth(); + if (queueDepth >= _maxQueueDepth) { - notifyClients("QueueSize = " + queueSize + ", Queue size has reached the threshold value"); + notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value"); } } - // Send the notification to the listeners + /** + * Sends the notification to the listeners + */ private void notifyClients(String notificationMsg) { - Notification n = new Notification( - MonitorNotification.THRESHOLD_VALUE_EXCEEDED, - this, - ++_notificationSequenceNumber, - System.currentTimeMillis(), - notificationMsg); + Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, + ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); _broadcaster.sendNotification(n); } @@ -387,10 +343,12 @@ public class AMQQueue implements Managable, Comparable } } + /** + * returns message content as byte array and related attributes for the given message id. + */ public CompositeData viewMessageContent(long msgId) throws JMException { List<AMQMessage> list = _deliveryMgr.getMessages(); - CompositeData messageContent = null; AMQMessage msg = null; for (AMQMessage message : list) { @@ -401,77 +359,55 @@ public class AMQQueue implements Managable, Comparable } } - if (msg != null) + if (msg == null) { - // get message content - List<ContentBody> cBodies = msg.getContentBodies(); - List<Byte> msgContent = new ArrayList<Byte>(); - for (ContentBody body : cBodies) + throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName ); + } + // get message content + List<ContentBody> cBodies = msg.getContentBodies(); + List<Byte> msgContent = new ArrayList<Byte>(); + for (ContentBody body : cBodies) + { + if (body.getSize() != 0) { - if (body.getSize() != 0) + ByteBuffer slice = body.payload.slice(); + for (int j = 0; j < slice.limit(); j++) { - ByteBuffer slice = body.payload.slice(); - for (int j = 0; j < slice.limit(); j++) - { - msgContent.add(slice.get()); - } + msgContent.add(slice.get()); } } - - // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties; - String mimeType = headerProperties.getContentType(); - String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding(); - - Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; - messageContent = new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); - } - else - { - throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); } - return messageContent; + // Create header attributes list + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties; + String mimeType = headerProperties.getContentType(); + String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding(); + Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; + + return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); } /** - * Returns the messages stored in this queue in tabular form. - * - * @param beginIndex - * @param endIndex - * @return AMQ messages in tabular form. - * @throws JMException + * Returns the header contents of the messages stored in this queue in tabular form. */ public TabularData viewMessages(int beginIndex, int endIndex) throws JMException { if ((beginIndex > endIndex) || (beginIndex < 1)) { - throw new JMException("FromIndex = " + beginIndex + ", ToIndex = " + endIndex + - "\nFromIndex should be greater than 0 and less than ToIndex"); + throw new JMException("From Index = " + beginIndex + ", To Index = " + endIndex + + "\nFrom Index should be greater than 0 and less than To Index"); } List<AMQMessage> list = _deliveryMgr.getMessages(); TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); - if (beginIndex > list.size()) - { - return _messageList; - } - endIndex = endIndex < list.size() ? endIndex : list.size(); - - for (int i = beginIndex; i <= endIndex; i++) + // Create the tabular list of message header contents + for (int i = beginIndex; i <= endIndex && i <= list.size(); i++) { AMQMessage msg = list.get(i - 1); - long size = 0; - // get message content - List<ContentBody> cBodies = msg.getContentBodies(); - for (ContentBody body : cBodies) - { - size = size + body.getSize(); - } - + ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties; + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)headerBody.properties; List<String> headerAttribsList = new ArrayList<String>(); headerAttribsList.add("App Id=" + headerProperties.getAppId()); headerAttribsList.add("MimeType=" + headerProperties.getContentType()); @@ -479,13 +415,9 @@ public class AMQQueue implements Managable, Comparable headerAttribsList.add("Encoding=" + headerProperties.getEncoding()); headerAttribsList.add(headerProperties.toString()); - Object[] itemValues = {msg.getMessageId(), - headerAttribsList.toArray(new String[0]), - size, msg.isRedelivered()}; - - CompositeData messageData = new CompositeDataSupport(_messageDataType, - _msgAttributeNames, - itemValues); + Object[] itemValues = {msg.getMessageId(), headerAttribsList.toArray(new String[0]), + headerBody.bodySize, msg.isRedelivered()}; + CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); _messageList.put(messageData); } @@ -493,20 +425,15 @@ public class AMQQueue implements Managable, Comparable } /** - * Creates all the notifications this MBean can send. - * - * @return Notifications broadcasted by this MBean. + * returns Notifications sent by this MBean. */ @Override public MBeanNotificationInfo[] getNotificationInfo() { - String[] notificationTypes = new String[] - {MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; + String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; String name = MonitorNotification.class.getName(); - String description = "An attribute of this MBean has reached threshold value"; - MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, - name, - description); + String description = "Either Message count or Queue depth or Message size has reached threshold high value"; + MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); return new MBeanNotificationInfo[]{info1}; } @@ -608,9 +535,9 @@ public class AMQQueue implements Managable, Comparable { return new AMQQueueMBean(); } - catch (NotCompliantMBeanException ex) + catch (JMException ex) { - throw new AMQException("AMQQueue MBean creation has failed.", ex); + throw new AMQException("AMQQueue MBean creation has failed ", ex); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java index 3a818cf31a..de5d0f55a7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java @@ -45,16 +45,48 @@ public interface ManagedQueue * @return the name of the managedQueue. * @throws IOException */ - @MBeanAttribute(name="Name", description = "Name of the " + TYPE) + @MBeanAttribute(name="Name", description = TYPE + " Name") String getName() throws IOException; /** - * Tells whether this ManagedQueue is durable or not. - * @return true if this ManagedQueue is a durable queue. + * Total number of messages on the queue, which are yet to be delivered to the consumer(s). + * @return number of undelivered message in the Queue. * @throws IOException */ - @MBeanAttribute(name="Durable", description = "true if the AMQQueue is durable") - boolean isDurable() throws IOException; + @MBeanAttribute(name="MessageCount", description = "Total number of undelivered messages on the queue") + Integer getMessageCount() throws IOException; + + /** + * Tells the total number of messages receieved by the queue since startup. + * @return total number of messages received. + * @throws IOException + */ + @MBeanAttribute(name="ReceivedMessageCount", description="The total number of messages receieved by the queue since startup") + Long getReceivedMessageCount() throws IOException; + + /** + * Size of messages in the queue + * @return + * @throws IOException + */ + @MBeanAttribute(name="QueueDepth", description="Size of messages(KB) in the queue") + Long getQueueDepth() throws IOException; + + /** + * Returns the total number of active subscribers to the queue. + * @return the number of active subscribers + * @throws IOException + */ + @MBeanAttribute(name="ActiveConsumerCount", description="The total number of active subscribers to the queue") + Integer getActiveConsumerCount() throws IOException; + + /** + * Returns the total number of subscribers to the queue. + * @return the number of subscribers. + * @throws IOException + */ + @MBeanAttribute(name="ConsumerCount", description="The total number of subscribers to the queue") + Integer getConsumerCount() throws IOException; /** * Tells the Owner of the ManagedQueue. @@ -65,21 +97,20 @@ public interface ManagedQueue String getOwner() throws IOException; /** - * Tells if the ManagedQueue is set to AutoDelete. - * @return true if the ManagedQueue is set to AutoDelete. + * Tells whether this ManagedQueue is durable or not. + * @return true if this ManagedQueue is a durable queue. * @throws IOException */ - @MBeanAttribute(name="AutoDelete", description = "true if the AMQQueue is AutoDelete") - boolean isAutoDelete() throws IOException; + @MBeanAttribute(name="Durable", description = "true if the AMQQueue is durable") + boolean isDurable() throws IOException; /** - * Total number of messages on the queue, which are yet to be delivered to the consumer(s). - * @return number of undelivered message in the Queue. + * Tells if the ManagedQueue is set to AutoDelete. + * @return true if the ManagedQueue is set to AutoDelete. * @throws IOException */ - @MBeanAttribute(name="MessageCount", - description = "Total number of undelivered messages on the queue") - Integer getMessageCount() throws IOException; + @MBeanAttribute(name="AutoDelete", description = "true if the AMQQueue is AutoDelete") + boolean isAutoDelete() throws IOException; /** * Returns the maximum size of a message (in kbytes) allowed to be accepted by the @@ -98,36 +129,10 @@ public interface ManagedQueue * @param size maximum size of message. * @throws IOException */ - @MBeanAttribute(name="MaximumMessageSize", - description="Maximum size(KB) of a message allowed for this Queue") + @MBeanAttribute(name="MaximumMessageSize", description="Threshold high value(KB) for a message size") void setMaximumMessageSize(Long size) throws IOException; /** - * Returns the total number of subscribers to the queue. - * @return the number of subscribers. - * @throws IOException - */ - @MBeanAttribute(name="ConsumerCount", description="The total number of subscribers to the queue") - Integer getConsumerCount() throws IOException; - - /** - * Returns the total number of active subscribers to the queue. - * @return the number of active subscribers - * @throws IOException - */ - @MBeanAttribute(name="ActiveConsumerCount", description="The total number of active subscribers to the queue") - Integer getActiveConsumerCount() throws IOException; - - /** - * Tells the total number of messages receieved by the queue since startup. - * @return total number of messages received. - * @throws IOException - */ - @MBeanAttribute(name="ReceivedMessageCount", - description="The total number of messages receieved by the queue since startup") - Long getReceivedMessageCount() throws IOException; - - /** * Tells the maximum number of messages that can be stored in the queue. * This is useful in setting the notifications or taking required * action is the number of message increase this limit. @@ -141,27 +146,16 @@ public interface ManagedQueue * @param value the maximum number of messages allowed to be stored in the queue. * @throws IOException */ - @MBeanAttribute(name="MaximumMessageCount", - description="The maximum number of messages allowed to be stored in the queue") + @MBeanAttribute(name="MaximumMessageCount", description="Threshold high value for number of undelivered messages in the queue") void setMaximumMessageCount(Integer value) throws IOException; /** - * Size of messages in the queue - * @return - * @throws IOException - */ - @MBeanAttribute(name="QueueSize", description="Size of messages(KB) in the queue") - Long getQueueSize() throws IOException; - - /** - * Tells the maximum size of all the messages combined together, - * that can be stored in the queue. This is useful for setting notifications - * or taking required action if the size of messages stored in the queue - * increases over this limit. - * @return maximum size of the all the messages allowed for the queue. + * This is useful for setting notifications or taking required action if the size of messages + * stored in the queue increases over this limit. + * @return threshold high value for Queue Depth * @throws IOException */ - Long getQueueDepth() throws IOException; + Long getMaximumQueueDepth() throws IOException; /** * Sets the maximum size of all the messages together, that can be stored @@ -169,9 +163,8 @@ public interface ManagedQueue * @param value * @throws IOException */ - @MBeanAttribute(name="QueueDepth", - description="The size(KB) of all the messages together, that can be stored in the queue") - void setQueueDepth(Long value) throws IOException; + @MBeanAttribute(name="MaximumQueueDepth", description="The threshold high value(KB) for Queue Depth") + void setMaximumQueueDepth(Long value) throws IOException; @@ -188,19 +181,22 @@ public interface ManagedQueue * @throws JMException */ @MBeanOperation(name="viewMessages", - description="shows messages in this queue with given indexes. eg. from index 1 - 100") + description="Message headers for messages in this queue within given index range. eg. from index 1 - 100") TabularData viewMessages(@MBeanOperationParameter(name="from index", description="from index")int fromIndex, @MBeanOperationParameter(name="to index", description="to index")int toIndex) throws IOException, JMException; + @MBeanOperation(name="viewMessageContent", description="The message content for given Message Id") + CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId) + throws IOException, JMException; + /** * Deletes the first message from top. * @throws IOException * @throws JMException */ - @MBeanOperation(name="deleteMessageFromTop", - description="Deletes the first message from top", - impact= MBeanOperationInfo.ACTION) + @MBeanOperation(name="deleteMessageFromTop", description="Deletes the first message from top", + impact= MBeanOperationInfo.ACTION) void deleteMessageFromTop() throws IOException, JMException; /** @@ -209,12 +205,8 @@ public interface ManagedQueue * @throws JMException */ @MBeanOperation(name="clearQueue", - description="Clears the queue by deleting all the undelivered messages from the queue", - impact= MBeanOperationInfo.ACTION) + description="Clears the queue by deleting all the undelivered messages from the queue", + impact= MBeanOperationInfo.ACTION) void clearQueue() throws IOException, JMException; - @MBeanOperation(name="viewMessageContent", - description="Returns the message content along with MimeType and Encoding") - CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId) - throws IOException, JMException; } |