summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2006-11-27 16:51:44 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2006-11-27 16:51:44 +0000
commit306988f998d552a03fa44a639cf743f1bdd5e794 (patch)
treecd9663e80de72e06f92656ddb03cf9d9ed60e303 /java
parent9856b26ac19cfe6510b799af707dfdac095c8f59 (diff)
downloadqpid-python-306988f998d552a03fa44a639cf743f1bdd5e794.tar.gz
QPID-129
MBeans updated with improved features, like AMQQueue mbean now has separate features for sending message header and message content. ( other details are in JIRA) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@479686 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java112
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java63
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java71
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java58
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java27
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java106
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java89
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java295
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java132
11 files changed, 403 insertions, 586 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index a1dabcd964..3470a5bb08 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -20,25 +20,13 @@
*/
package org.apache.qpid.server;
-import org.apache.qpid.framing.ProtocolVersionList;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
-import org.apache.qpid.server.protocol.AMQPProtocolProvider;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.transport.ConnectorConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.management.*;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.commons.cli.*;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
@@ -48,20 +36,40 @@ import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.ManagedBroker;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
+import org.apache.qpid.server.protocol.AMQPProtocolProvider;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transport.ConnectorConfiguration;
+import org.apache.qpid.url.URLSyntaxException;
import javax.management.JMException;
import javax.management.MBeanException;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
import javax.management.MalformedObjectNameException;
-
+import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.util.StringTokenizer;
import java.util.Collection;
import java.util.List;
+import java.util.StringTokenizer;
/**
* Main entry point for AMQPD.
@@ -439,9 +447,9 @@ public class Main implements ProtocolVersionList
{
new AMQBrokerManager().register();
}
- catch (NotCompliantMBeanException ex)
+ catch (JMException ex)
{
- throw new AMQException("Exception occured in creating AMQBrokerManager MBean.");
+ throw new AMQException("Exception occured in creating AMQBrokerManager MBean");
}
}
@@ -450,8 +458,7 @@ public class Main implements ProtocolVersionList
* Broker level management features like creating and deleting exchanges and queue.
*/
@MBeanDescription("This MBean exposes the broker level management features")
- private final class AMQBrokerManager extends AMQManagedObject
- implements ManagedBroker
+ private final class AMQBrokerManager extends AMQManagedObject implements ManagedBroker
{
private final QueueRegistry _queueRegistry;
private final ExchangeRegistry _exchangeRegistry;
@@ -459,10 +466,9 @@ public class Main implements ProtocolVersionList
private final MessageStore _messageStore;
@MBeanConstructor("Creates the Broker Manager MBean")
- protected AMQBrokerManager() throws NotCompliantMBeanException
+ protected AMQBrokerManager() throws JMException
{
super(ManagedBroker.class, ManagedBroker.TYPE);
-
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
_queueRegistry = appRegistry.getQueueRegistry();
_exchangeRegistry = appRegistry.getExchangeRegistry();
@@ -483,10 +489,7 @@ public class Main implements ProtocolVersionList
* @param autoDelete
* @throws JMException
*/
- public void createNewExchange(String exchangeName,
- String type,
- boolean durable,
- boolean autoDelete)
+ public void createNewExchange(String exchangeName, String type, boolean durable, boolean autoDelete)
throws JMException
{
try
@@ -494,14 +497,9 @@ public class Main implements ProtocolVersionList
synchronized(_exchangeRegistry)
{
Exchange exchange = _exchangeRegistry.getExchange(exchangeName);
-
if (exchange == null)
{
- exchange = _exchangeFactory.createExchange(exchangeName,
- type, //eg direct
- durable,
- autoDelete,
- 0); //ticket no
+ exchange = _exchangeFactory.createExchange(exchangeName, type, durable, autoDelete, 0);
_exchangeRegistry.registerExchange(exchange);
}
else
@@ -522,8 +520,7 @@ public class Main implements ProtocolVersionList
* @param exchangeName
* @throws JMException
*/
- public void unregisterExchange(String exchangeName)
- throws JMException
+ public void unregisterExchange(String exchangeName) throws JMException
{
boolean inUse = false;
// TODO
@@ -550,33 +547,28 @@ public class Main implements ProtocolVersionList
* @param autoDelete
* @throws JMException
*/
- public void createQueue(String queueName,
- boolean durable,
- String owner,
- boolean autoDelete)
+ public void createNewQueue(String queueName, boolean durable, String owner, boolean autoDelete)
throws JMException
{
AMQQueue queue = _queueRegistry.getQueue(queueName);
- if (queue == null)
+ if (queue != null)
{
- try
- {
- queue = new AMQQueue(queueName, durable, owner, autoDelete, _queueRegistry);
- if (queue.isDurable() && !queue.isAutoDelete())
- {
- _messageStore.createQueue(queue);
- }
- _queueRegistry.registerQueue(queue);
- }
- catch (AMQException ex)
+ throw new JMException("The queue \"" + queueName + "\" already exists.");
+ }
+
+ try
+ {
+ queue = new AMQQueue(queueName, durable, owner, autoDelete, _queueRegistry);
+ if (queue.isDurable() && !queue.isAutoDelete())
{
- _logger.error("Error in creating queue " + queueName, ex);
- throw new MBeanException(ex, ex.toString());
+ _messageStore.createQueue(queue);
}
+ _queueRegistry.registerQueue(queue);
}
- else
+ catch (AMQException ex)
{
- throw new JMException("The queue \"" + queueName + "\" already exists.");
+ _logger.error("Error in creating queue " + queueName, ex);
+ throw new MBeanException(ex, ex.toString());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 69f5e862d6..d5ca567308 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -25,14 +25,16 @@ import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
+import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
public abstract class AbstractExchange implements Exchange, Managable
{
private String _name;
protected boolean _durable;
-
+ protected String _exchangeType;
protected int _ticket;
protected ExchangeMBean _exchangeMbean;
@@ -64,6 +66,11 @@ public abstract class AbstractExchange implements Exchange, Managable
return _name;
}
+ public String getExchangeType()
+ {
+ return _exchangeType;
+ }
+
public Integer getTicketNo()
{
return _ticket;
@@ -79,6 +86,13 @@ public abstract class AbstractExchange implements Exchange, Managable
return _autoDelete;
}
+ public ObjectName getObjectName() throws MalformedObjectNameException
+ {
+ String objNameString = super.getObjectName().toString();
+ objNameString = objNameString + ",ExchangeType=" + _exchangeType;
+ return new ObjectName(objNameString);
+ }
+
} // End of MBean class
public String getName()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index 085d0aad19..6dc97f9e48 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -24,15 +24,14 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import javax.management.JMException;
import javax.management.MBeanException;
-import javax.management.NotCompliantMBeanException;
import javax.management.openmbean.*;
import java.util.ArrayList;
import java.util.List;
@@ -53,53 +52,36 @@ public class DestNameExchange extends AbstractExchange
@MBeanDescription("Management Bean for Direct Exchange")
private final class DestNameExchangeMBean extends ExchangeMBean
{
- private String[] _bindingItemNames = {"BindingKey", "QueueNames"};
- private String[] _bindingItemDescriptions = {"Binding key", "Queue Names"};
- private String[] _bindingItemIndexNames = {"BindingKey"};
+ // open mbean data types for representing exchange bindings
+ private String[] _bindingItemNames = {"Routing Key", "Queue Names"};
+ private String[] _bindingItemIndexNames = {_bindingItemNames[0]};
private OpenType[] _bindingItemTypes = new OpenType[2];
-
private CompositeType _bindingDataType = null;
private TabularType _bindinglistDataType = null;
private TabularDataSupport _bindingList = null;
@MBeanConstructor("Creates an MBean for AMQ direct exchange")
- public DestNameExchangeMBean() throws NotCompliantMBeanException
+ public DestNameExchangeMBean() throws JMException
{
super();
+ _exchangeType = "direct";
init();
}
/**
* initialises the OpenType objects.
*/
- private void init()
+ private void init() throws OpenDataException
{
- try
- {
- _bindingItemTypes[0] = SimpleType.STRING;
- //_bindingItemTypes[1] = ArrayType.getArrayType(SimpleType.STRING);
- _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
-
- _bindingDataType = new CompositeType("QueueBinding",
- "Binding key and bound Queue names",
- _bindingItemNames,
- _bindingItemDescriptions,
- _bindingItemTypes);
- _bindinglistDataType = new TabularType("Bindings",
- "List of queue bindings for " + getName() ,
- _bindingDataType,
- _bindingItemIndexNames);
- }
- catch(OpenDataException ex)
- {
- //It should never occur.
- _logger.error("OpenDataTypes could not be created.", ex);
- throw new RuntimeException(ex);
- }
+ _bindingItemTypes[0] = SimpleType.STRING;
+ _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
+ _bindingDataType = new CompositeType("Exchange Binding", "Routing key and Queue names",
+ _bindingItemNames, _bindingItemNames, _bindingItemTypes);
+ _bindinglistDataType = new TabularType("Exchange Bindings", "Exchange Bindings for " + getName(),
+ _bindingDataType, _bindingItemIndexNames);
}
- public TabularData viewBindings()
- throws OpenDataException
+ public TabularData bindings() throws OpenDataException
{
Map<String, List<AMQQueue>> bindings = _index.getBindingsMap();
_bindingList = new TabularDataSupport(_bindinglistDataType);
@@ -116,20 +98,16 @@ public class DestNameExchange extends AbstractExchange
}
Object[] bindingItemValues = {key, queueList.toArray(new String[0])};
- CompositeData bindingData = new CompositeDataSupport(_bindingDataType,
- _bindingItemNames,
- bindingItemValues);
+ CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
return _bindingList;
}
- public void createBinding(String queueName, String binding)
- throws JMException
+ public void createNewBinding(String queueName, String binding) throws JMException
{
AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName);
-
if (queue == null)
{
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
@@ -155,10 +133,10 @@ public class DestNameExchange extends AbstractExchange
{
return new DestNameExchangeMBean();
}
- catch (NotCompliantMBeanException ex)
+ catch (JMException ex)
{
- _logger.error("Exception occured in creating the DestNameExchenge", ex);
- throw new AMQException("Exception occured in creating the DestNameExchenge", ex);
+ _logger.error("Exception occured in creating the direct exchange mbean", ex);
+ throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
}
}
@@ -172,8 +150,7 @@ public class DestNameExchange extends AbstractExchange
}
else
{
- _logger.debug("Binding queue " + queue + " with routing key " + routingKey
- + " to exchange " + this);
+ _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index f6abd53076..a692f9ebca 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -22,18 +22,17 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.management.MBeanConstructor;
-import javax.management.openmbean.*;
import javax.management.JMException;
import javax.management.MBeanException;
-import javax.management.NotCompliantMBeanException;
+import javax.management.openmbean.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -53,55 +52,41 @@ public class DestWildExchange extends AbstractExchange
@MBeanDescription("Management Bean for Topic Exchange")
private final class DestWildExchangeMBean extends ExchangeMBean
{
- private String[] _bindingItemNames = {"BindingKey", "QueueNames"};
- private String[] _bindingItemDescriptions = {"Binding key", "Queue Names"};
- private String[] _bindingItemIndexNames = {"BindingKey"};
+ // open mbean data types for representing exchange bindings
+ private String[] _bindingItemNames = {"Routing Key", "Queue Names"};
+ private String[] _bindingItemIndexNames = {_bindingItemNames[0]};
private OpenType[] _bindingItemTypes = new OpenType[2];
-
private CompositeType _bindingDataType = null;
private TabularType _bindinglistDataType = null;
private TabularDataSupport _bindingList = null;
@MBeanConstructor("Creates an MBean for AMQ topic exchange")
- public DestWildExchangeMBean() throws NotCompliantMBeanException
+ public DestWildExchangeMBean() throws JMException
{
super();
+ _exchangeType = "topic";
init();
}
/**
* initialises the OpenType objects.
*/
- private void init()
+ private void init() throws OpenDataException
{
- try
- {
- _bindingItemTypes[0] = SimpleType.STRING;
- _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
-
- _bindingDataType = new CompositeType("QueueBinding",
- "Binding key and bound Queue names",
- _bindingItemNames,
- _bindingItemDescriptions,
- _bindingItemTypes);
- _bindinglistDataType = new TabularType("Bindings",
- "List of queue bindings for " + getName(),
- _bindingDataType,
- _bindingItemIndexNames);
- }
- catch(OpenDataException ex)
- {
- //It should never occur.
- _logger.error("OpenDataTypes could not be created.", ex);
- throw new RuntimeException(ex);
- }
+ _bindingItemTypes[0] = SimpleType.STRING;
+ _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
+ _bindingDataType = new CompositeType("Exchange Binding", "Routing key and Queue names",
+ _bindingItemNames, _bindingItemNames, _bindingItemTypes);
+ _bindinglistDataType = new TabularType("Exchange Bindings", "Exchange Bindings for " + getName(),
+ _bindingDataType, _bindingItemIndexNames);
}
- public TabularData viewBindings()
- throws OpenDataException
+ /**
+ * returns exchange bindings in tabular form
+ */
+ public TabularData bindings() throws OpenDataException
{
_bindingList = new TabularDataSupport(_bindinglistDataType);
-
for (Map.Entry<String, List<AMQQueue>> entry : _routingKey2queues.entrySet())
{
String key = entry.getKey();
@@ -114,20 +99,16 @@ public class DestWildExchange extends AbstractExchange
}
Object[] bindingItemValues = {key, queueList.toArray(new String[0])};
- CompositeData bindingData = new CompositeDataSupport(_bindingDataType,
- _bindingItemNames,
- bindingItemValues);
+ CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
return _bindingList;
}
- public void createBinding(String queueName, String binding)
- throws JMException
+ public void createNewBinding(String queueName, String binding) throws JMException
{
AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName);
-
if (queue == null)
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
@@ -217,10 +198,10 @@ public class DestWildExchange extends AbstractExchange
{
return new DestWildExchangeMBean();
}
- catch (NotCompliantMBeanException ex)
+ catch (JMException ex)
{
- _logger.error("Exception occured in creating the DestWildExchenge", ex);
- throw new AMQException("Exception occured in creating the DestWildExchenge", ex);
+ _logger.error("Exception occured in creating the topic exchenge mbean", ex);
+ throw new AMQException("Exception occured in creating the topic exchenge mbean", ex);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 961d4ddf4c..586d6b8796 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -32,7 +32,6 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import javax.management.JMException;
-import javax.management.NotCompliantMBeanException;
import javax.management.openmbean.*;
import java.util.ArrayList;
import java.util.Iterator;
@@ -80,53 +79,39 @@ public class HeadersExchange extends AbstractExchange
@MBeanDescription("Management Bean for Headers Exchange")
private final class HeadersExchangeMBean extends ExchangeMBean
{
- private String[] _bindingItemNames = {"Queue", "HeaderBinding"};
- private String[] _bindingItemDescriptions = {"Queue Name", "Header attribute bindings"};
- private String[] _bindingItemIndexNames = {"HeaderBinding"};
- private OpenType[] _bindingItemTypes = new OpenType[2];
-
+ // open mbean data types for representing exchange bindings
+ private String[] _bindingItemNames = {"S.No.", "Queue Name", "Header Bindings"};
+ private String[] _bindingItemIndexNames = {_bindingItemNames[0]};
+ private OpenType[] _bindingItemTypes = new OpenType[3];
private CompositeType _bindingDataType = null;
private TabularType _bindinglistDataType = null;
private TabularDataSupport _bindingList = null;
@MBeanConstructor("Creates an MBean for AMQ Headers exchange")
- public HeadersExchangeMBean() throws NotCompliantMBeanException
+ public HeadersExchangeMBean() throws JMException
{
super();
+ _exchangeType = "headers";
init();
}
/**
* initialises the OpenType objects.
*/
- private void init()
+ private void init() throws OpenDataException
{
- try
- {
- _bindingItemTypes[0] = SimpleType.STRING;
- _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
-
- _bindingDataType = new CompositeType("QueueAndHeaderAttributesBinding",
- "Queue name and header bindings",
- _bindingItemNames,
- _bindingItemDescriptions,
- _bindingItemTypes);
- _bindinglistDataType = new TabularType("HeaderBindings",
- "List of queue bindings for " + getName(),
- _bindingDataType,
- _bindingItemIndexNames);
- }
- catch(OpenDataException ex)
- {
- //It should never occur.
- _logger.error("OpenDataTypes could not be created.", ex);
- throw new RuntimeException(ex);
- }
+ _bindingItemTypes[0] = SimpleType.INTEGER;
+ _bindingItemTypes[1] = SimpleType.STRING;
+ _bindingItemTypes[2] = new ArrayType(1, SimpleType.STRING);
+ _bindingDataType = new CompositeType("Exchange Binding", "Queue name and header bindings",
+ _bindingItemNames, _bindingItemNames, _bindingItemTypes);
+ _bindinglistDataType = new TabularType("Exchange Bindings", "List of exchange bindings for " + getName(),
+ _bindingDataType, _bindingItemIndexNames);
}
- public TabularData viewBindings()
- throws OpenDataException
+ public TabularData bindings() throws OpenDataException
{
_bindingList = new TabularDataSupport(_bindinglistDataType);
+ int count = 1;
for (Iterator<Registration> itr = _bindings.iterator(); itr.hasNext();)
{
Registration registration = itr.next();
@@ -144,10 +129,8 @@ public class HeadersExchange extends AbstractExchange
mappingList.add(key + "=" + value);
}
- Object[] bindingItemValues = {queueName, mappingList.toArray(new String[0])};
- CompositeData bindingData = new CompositeDataSupport(_bindingDataType,
- _bindingItemNames,
- bindingItemValues);
+ Object[] bindingItemValues = {count++, queueName, mappingList.toArray(new String[0])};
+ CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -161,8 +144,7 @@ public class HeadersExchange extends AbstractExchange
* @param binding
* @throws JMException
*/
- public void createBinding(String queueName, String binding)
- throws JMException
+ public void createNewBinding(String queueName, String binding) throws JMException
{
AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName);
@@ -240,7 +222,7 @@ public class HeadersExchange extends AbstractExchange
{
return new HeadersExchangeMBean();
}
- catch (NotCompliantMBeanException ex)
+ catch (JMException ex)
{
_logger.error("Exception occured in creating the HeadersExchangeMBean", ex);
throw new AMQException("Exception occured in creating the HeadersExchangeMBean", ex);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java
index 3c2c105186..b61aa233f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.server.management.MBeanAttribute;
import org.apache.qpid.server.management.MBeanOperation;
import org.apache.qpid.server.management.MBeanOperationParameter;
+import org.apache.qpid.server.queue.ManagedQueue;
import javax.management.openmbean.TabularData;
import javax.management.JMException;
@@ -44,9 +45,12 @@ public interface ManagedExchange
* @return the name of the exchange.
* @throws IOException
*/
- @MBeanAttribute(name="Name", description="Name of exchange")
+ @MBeanAttribute(name="Name", description=TYPE + " Name")
String getName() throws IOException;
+ @MBeanAttribute(name="ExchangeType", description="Exchange Type")
+ String getExchangeType() throws IOException;
+
@MBeanAttribute(name="TicketNo", description="Exchange Ticket No")
Integer getTicketNo() throws IOException;
@@ -74,8 +78,8 @@ public interface ManagedExchange
* @throws IOException
* @throws JMException
*/
- @MBeanOperation(name="viewBindings", description="view the queue bindings for this exchange")
- TabularData viewBindings() throws IOException, JMException;
+ @MBeanOperation(name="bindings", description="view the queue bindings for this exchange")
+ TabularData bindings() throws IOException, JMException;
/**
* Creates new binding with the given queue and binding.
@@ -83,11 +87,11 @@ public interface ManagedExchange
* @param binding
* @throws JMException
*/
- @MBeanOperation(name="createBinding",
- description="create a new binding with this exchange",
- impact= MBeanOperationInfo.ACTION)
- void createBinding(@MBeanOperationParameter(name="queue name", description="queue name") String queueName,
- @MBeanOperationParameter(name="binding", description="queue binding")String binding)
+ @MBeanOperation(name="createNewBinding",
+ description="create a new binding with this exchange",
+ impact= MBeanOperationInfo.ACTION)
+ void createNewBinding(@MBeanOperationParameter(name= ManagedQueue.TYPE, description="Queue name") String queueName,
+ @MBeanOperationParameter(name="Binding", description="New binding")String binding)
throws JMException;
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java b/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java
index 266fb62fd8..aec7d6cb73 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.management;
+import org.apache.qpid.server.exchange.ManagedExchange;
+import org.apache.qpid.server.queue.ManagedQueue;
+
import javax.management.JMException;
import javax.management.MBeanOperationInfo;
import java.io.IOException;
@@ -45,10 +48,9 @@ public interface ManagedBroker
* @throws IOException
* @throws JMException
*/
- @MBeanOperation(name="createNewExchange", description="Creates a new Exchange",
- impact= MBeanOperationInfo.ACTION)
+ @MBeanOperation(name="createNewExchange", description="Creates a new Exchange", impact= MBeanOperationInfo.ACTION)
void createNewExchange(@MBeanOperationParameter(name="name", description="Name of the new exchange")String name,
- @MBeanOperationParameter(name="excahnge type", description="Type of the exchange")String type,
+ @MBeanOperationParameter(name="ExchangeType", description="Type of the exchange")String type,
@MBeanOperationParameter(name="durable", description="true if the Exchang should be durable")boolean durable,
@MBeanOperationParameter(name="passive", description="true of the Exchange should be passive")boolean passive)
throws IOException, JMException;
@@ -61,9 +63,9 @@ public interface ManagedBroker
* @throws JMException
*/
@MBeanOperation(name="unregisterExchange",
- description="Unregisters all the related channels and queuebindings of this exchange",
- impact= MBeanOperationInfo.ACTION)
- void unregisterExchange(@MBeanOperationParameter(name="exchange name", description="Name of the exchange")String exchange)
+ description="Unregisters all the related channels and queuebindings of this exchange",
+ impact= MBeanOperationInfo.ACTION)
+ void unregisterExchange(@MBeanOperationParameter(name= ManagedExchange.TYPE, description="Exchange Name")String exchange)
throws IOException, JMException;
/**
@@ -75,12 +77,11 @@ public interface ManagedBroker
* @throws IOException
* @throws JMException
*/
- @MBeanOperation(name="createQueue", description="Create a new Queue on the Broker server",
- impact= MBeanOperationInfo.ACTION)
- void createQueue(@MBeanOperationParameter(name="queue name", description="Name of the new queue")String queueName,
- @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable,
- @MBeanOperationParameter(name="owner", description="Owner name")String owner,
- @MBeanOperationParameter(name="autoDelete", description="true if the queue should be auto delete") boolean autoDelete)
+ @MBeanOperation(name="createNewQueue", description="Create a new Queue on the Broker server", impact= MBeanOperationInfo.ACTION)
+ void createNewQueue(@MBeanOperationParameter(name="queue name", description="Name of the new queue")String queueName,
+ @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable,
+ @MBeanOperationParameter(name="owner", description="Owner name")String owner,
+ @MBeanOperationParameter(name="autoDelete", description="true if the queue should be auto delete") boolean autoDelete)
throws IOException, JMException;
/**
@@ -93,6 +94,6 @@ public interface ManagedBroker
@MBeanOperation(name="deleteQueue",
description="Unregisters the Queue bindings, removes the subscriptions and deletes the queue",
impact= MBeanOperationInfo.ACTION)
- void deleteQueue(@MBeanOperationParameter(name="queue name", description="Name of the queue")String queueName)
+ void deleteQueue(@MBeanOperationParameter(name= ManagedQueue.TYPE, description="Queue Name")String queueName)
throws IOException, JMException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 966af77d64..be8d2c4c82 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -52,7 +52,6 @@ import org.apache.qpid.server.state.AMQStateManager;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
-import javax.management.NotCompliantMBeanException;
import javax.management.Notification;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.CompositeData;
@@ -122,66 +121,35 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
* augment the ManagedConnection interface and add the appropriate implementation here.
*/
@MBeanDescription("Management Bean for an AMQ Broker Connection")
- private final class ManagedAMQProtocolSession extends AMQManagedObject
- implements ManagedConnection
+ private final class ManagedAMQProtocolSession extends AMQManagedObject implements ManagedConnection
{
private String _name = null;
- /**
- * Represents the channel attributes sent with channel data.
- */
- private String[] _channelAtttibuteNames = { "ChannelId",
- "Transactional",
- "DefaultQueue",
- "UnacknowledgedMessageCount"};
- private String[] _channelAttributeDescriptions = { "Channel Identifier",
- "is Channel Transactional?",
- "Default Queue Name",
- "Unacknowledged Message Count"};
- private OpenType[] _channelAttributeTypes = { SimpleType.INTEGER,
- SimpleType.BOOLEAN,
- SimpleType.STRING,
- SimpleType.INTEGER};
-
- private String[] _indexNames = { "ChannelId" }; //Channels in the list will be indexed according to channelId.
- private CompositeType _channelType = null; // represents the data type for channel data
- private TabularType _channelsType = null; // Datatype for list of channelsType
+ //openmbean data types for representing the channel attributes
+ private String[] _channelAtttibuteNames = { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
+ private String[] _indexNames = {_channelAtttibuteNames[0]};
+ private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
+ private CompositeType _channelType = null; // represents the data type for channel data
+ private TabularType _channelsType = null; // Data type for list of channels type
private TabularDataSupport _channelsList = null;
@MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
- public ManagedAMQProtocolSession() throws NotCompliantMBeanException
+ public ManagedAMQProtocolSession() throws JMException
{
super(ManagedConnection.class, ManagedConnection.TYPE);
init();
}
/**
- * initialises the CompositeTypes and TabularType attributes.
+ * initialises the openmbean data types
*/
- private void init()
+ private void init() throws OpenDataException
{
String remote = getRemoteAddress();
remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
_name = jmxEncode(new StringBuffer(remote), 0).toString();
-
- try
- {
- _channelType = new CompositeType("channel",
- "Channel Details",
- _channelAtttibuteNames,
- _channelAttributeDescriptions,
- _channelAttributeTypes);
-
- _channelsType = new TabularType("channelsType",
- "List of available channels",
- _channelType,
- _indexNames);
- }
- catch(OpenDataException ex)
- {
- // It should never occur.
- _logger.error("OpenDataTypes could not be created.", ex);
- throw new RuntimeException(ex);
- }
+ _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
+ _channelAtttibuteNames, _channelAttributeTypes);
+ _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
}
public Date getLastIoTime()
@@ -204,12 +172,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return _minaProtocolSession.getReadBytes();
}
- public Long getMaximumNumberOfAllowedChannels()
+ public Long getMaximumNumberOfChannels()
{
return _maxNoOfChannels;
}
- public void setMaximumNumberOfAllowedChannels(Long value)
+ public void setMaximumNumberOfChannels(Long value)
{
_maxNoOfChannels = value;
}
@@ -264,30 +232,25 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
* @return list of channels in tabular form.
* @throws OpenDataException
*/
- public TabularData getChannels() throws OpenDataException
+ public TabularData channels() throws OpenDataException
{
_channelsList = new TabularDataSupport(_channelsType);
for (Map.Entry<Integer, AMQChannel> entry : _channelMap.entrySet())
{
AMQChannel channel = entry.getValue();
- Object[] itemValues = {channel.getChannelId(),
- channel.isTransactional(),
+ Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
(channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null,
channel.getUnacknowledgedMessageMap().size()};
- CompositeData channelData = new CompositeDataSupport(_channelType,
- _channelAtttibuteNames,
- itemValues);
-
+ CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
_channelsList.put(channelData);
}
return _channelsList;
}
-
- public void closeChannel(int id)
- throws Exception
+
+ public void closeChannel(int id) throws Exception
{
try
{
@@ -299,8 +262,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
- public void closeConnection()
- throws Exception
+ public void closeConnection() throws Exception
{
try
{
@@ -315,13 +277,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
- String[] notificationTypes = new String[]
- {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
String name = MonitorNotification.class.getName();
- String description = "An attribute of this MBean has reached threshold value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes,
- name,
- description);
+ String description = "Channel count has reached threshold value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
return new MBeanNotificationInfo[] {info1};
}
@@ -329,14 +288,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private void checkForNotification()
{
int channelsCount = _channelMap.size();
- if (channelsCount >= getMaximumNumberOfAllowedChannels())
+ if (channelsCount >= getMaximumNumberOfChannels())
{
- Notification n = new Notification(
- MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
- this,
- ++_notificationSequenceNumber,
- System.currentTimeMillis(),
- "ChannelsCount = " + channelsCount + ", ChannelsCount has reached the threshold value");
+ Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ ++_notificationSequenceNumber, System.currentTimeMillis(),
+ "Channel count (" + channelsCount + ") has reached the threshold value");
_broadcaster.sendNotification(n);
}
@@ -372,10 +328,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
return new ManagedAMQProtocolSession();
}
- catch(NotCompliantMBeanException ex)
+ catch(JMException ex)
{
- _logger.error("AMQProtocolSession MBean creation has failed.", ex);
- throw new AMQException("AMQProtocolSession MBean creation has failed.", ex);
+ _logger.error("AMQProtocolSession MBean creation has failed ", ex);
+ throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
index 889acd0142..2f3102b048 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
@@ -41,83 +41,57 @@ public interface ManagedConnection
static final String TYPE = "Connection";
/**
- * channel details of all the channels opened for this connection.
- * @return general channel details
- * @throws IOException
- * @throws JMException
+ * Tells the remote address of this connection.
+ * @return remote address
*/
- @MBeanAttribute(name="Channels",
- description="channel details of all the channels opened for this connection")
- TabularData getChannels() throws IOException, JMException;
+ @MBeanAttribute(name="RemoteAddress", description=TYPE + " Address")
+ String getRemoteAddress();
/**
* Tells the last time, the IO operation was done.
* @return last IO time.
*/
- @MBeanAttribute(name="LastIOTime",
- description="The last time, the IO operation was done")
+ @MBeanAttribute(name="LastIOTime", description="The last time, the IO operation was done")
Date getLastIoTime();
/**
- * Tells the remote address of this connection.
- * @return remote address
- */
- @MBeanAttribute(name="RemoteAddress",
- description="The remote address of this connection")
- String getRemoteAddress();
-
- /**
* Tells the total number of bytes written till now.
* @return number of bytes written.
*/
- @MBeanAttribute(name="WrittenBytes",
- description="The total number of bytes written till now")
+ @MBeanAttribute(name="WrittenBytes", description="The total number of bytes written till now")
Long getWrittenBytes();
/**
* Tells the total number of bytes read till now.
* @return number of bytes read.
*/
- @MBeanAttribute(name="ReadBytes",
- description="The total number of bytes read till now")
+ @MBeanAttribute(name="ReadBytes", description="The total number of bytes read till now")
Long getReadBytes();
/**
- * Tells the maximum number of channels that can be opened using
- * this connection. This is useful in setting notifications or
+ * Threshold high value for no of channels. This is useful in setting notifications or
* taking required action is there are more channels being created.
- * @return maximum number of channels allowed to be created.
+ * @return threshold limit for no of channels
*/
- Long getMaximumNumberOfAllowedChannels();
+ Long getMaximumNumberOfChannels();
/**
- * Sets the maximum number of channels allowed to be created using
- * this connection.
+ * Sets the threshold high value for number of channels for a connection
* @param value
*/
- @MBeanAttribute(name="MaximumNumberOfAllowedChannels",
- description="The maximum number of channels that can be opened using this connection")
- void setMaximumNumberOfAllowedChannels(Long value);
+ @MBeanAttribute(name="MaximumNumberOfChannels", description="The threshold high value for number of channels for this connection")
+ void setMaximumNumberOfChannels(Long value);
//********** Operations *****************//
/**
- * Closes all the related channels and unregisters this connection from managed objects.
- */
- @MBeanOperation(name="closeConnection",
- description="Closes this connection and all related channels",
- impact= MBeanOperationInfo.ACTION)
- void closeConnection() throws Exception;
-
- /**
- * Unsubscribes the consumers and unregisters the channel from managed objects.
+ * channel details of all the channels opened for this connection.
+ * @return general channel details
+ * @throws IOException
+ * @throws JMException
*/
- @MBeanOperation(name="closeChannel",
- description="Closes the channel with given channeld and" +
- "connected consumers will be unsubscribed",
- impact= MBeanOperationInfo.ACTION)
- void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId)
- throws Exception;
+ @MBeanOperation(name="channels", description="Channel details for this connection")
+ TabularData channels() throws IOException, JMException;
/**
* Commits the transactions if the channel is transactional.
@@ -125,8 +99,8 @@ public interface ManagedConnection
* @throws JMException
*/
@MBeanOperation(name="commitTransaction",
- description="Commits the transactions for given channelID, if the channel is transactional",
- impact= MBeanOperationInfo.ACTION)
+ description="Commits the transactions for given channel Id, if the channel is transactional",
+ impact= MBeanOperationInfo.ACTION)
void commitTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException;
/**
@@ -135,7 +109,24 @@ public interface ManagedConnection
* @throws JMException
*/
@MBeanOperation(name="rollbackTransactions",
- description="Rollsback the transactions for given channelId, if the channel is transactional",
- impact= MBeanOperationInfo.ACTION)
+ description="Rollsback the transactions for given channel Id, if the channel is transactional",
+ impact= MBeanOperationInfo.ACTION)
void rollbackTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException;
+
+ /**
+ * Unsubscribes the consumers and unregisters the channel from managed objects.
+ */
+ @MBeanOperation(name="closeChannel",
+ description="Closes the channel with given channel Id and connected consumers will be unsubscribed",
+ impact= MBeanOperationInfo.ACTION)
+ void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId)
+ throws Exception;
+
+ /**
+ * Closes all the related channels and unregisters this connection from managed objects.
+ */
+ @MBeanOperation(name="closeConnection",
+ description="Closes this connection and all related channels",
+ impact= MBeanOperationInfo.ACTION)
+ void closeConnection() throws Exception;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 05fe8908c3..353a2007c0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -25,6 +25,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -38,7 +39,6 @@ import org.apache.qpid.server.txn.TxnOp;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
-import javax.management.NotCompliantMBeanException;
import javax.management.Notification;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.*;
@@ -100,19 +100,19 @@ public class AMQQueue implements Managable, Comparable
private final AMQQueueMBean _managedObject;
/**
- * max allowed size of a single message(in KBytes).
+ * max allowed size(KB) of a single message
*/
- private long _maxAllowedMessageSize = 10000; // 10 MB
+ private long _maxMessageSize = 10000;
/**
* max allowed number of messages on a queue.
*/
- private Integer _maxAllowedMessageCount = 10000;
+ private Integer _maxMessageCount = 10000;
/**
- * max allowed size in KBytes for all the messages combined together in a queue.
+ * max queue depth(KB) for the queue
*/
- private long _queueDepth = 10000000; // 10 GB
+ private long _maxQueueDepth = 10000000;
/**
* total messages received by the queue since startup.
@@ -132,83 +132,45 @@ public class AMQQueue implements Managable, Comparable
private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
{
private String _queueName = null;
+ // OpenMBean data types for viewMessages method
+ private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"};
+ private String[] _msgAttributeIndex = {_msgAttributeNames[0]};
+ private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
+ private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
+ private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
- // AMQ message attribute names
- private String[] _msgAttributeNames = {"MessageId",
- "Header",
- "Size",
- "Redelivered"
- };
- // AMQ Message attribute descriptions.
- private String[] _msgAttributeDescriptions = {"Message Id",
- "Header",
- "Message size in bytes",
- "Redelivered"
- };
-
- private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
- private String[] _msgAttributeIndex = {"MessageId"}; // Messages will be indexed according to the messageId.
- private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
- private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
-
-
- private CompositeType _msgContentType = null; // For message content
- private String[] _msgContentAttributes = {"MessageId",
- "MimeType",
- "Encoding",
- "Content"
- };
- private String[] _msgContentDescriptions = {"Message Id",
- "MimeType",
- "Encoding",
- "Message content"
- };
- private OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
-
- @MBeanConstructor("Creates an MBean exposing an AMQQueue.")
- public AMQQueueMBean() throws NotCompliantMBeanException
+ // OpenMBean data types for viewMessageContent method
+ private CompositeType _msgContentType = null;
+ private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
+ private OpenType[] _msgContentAttributeTypes = new OpenType[4];
+
+ @MBeanConstructor("Creates an MBean exposing an AMQQueue")
+ public AMQQueueMBean() throws JMException
{
super(ManagedQueue.class, ManagedQueue.TYPE);
init();
}
- private void init()
+ /**
+ * initialises the openmbean data types
+ */
+ private void init() throws OpenDataException
{
_queueName = jmxEncode(new StringBuffer(_name), 0).toString();
- try
- {
- _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
- _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
- _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
- _msgContentType = new CompositeType("MessageContent",
- "AMQ Message Content",
- _msgContentAttributes,
- _msgContentDescriptions,
- _msgContentAttributeTypes);
-
-
- _msgAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
- _msgAttributeTypes[2] = SimpleType.LONG; // For size
- _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
-
- _messageDataType = new CompositeType("Message",
- "AMQ Message",
- _msgAttributeNames,
- _msgAttributeDescriptions,
- _msgAttributeTypes);
- _messagelistDataType = new TabularType("Messages",
- "List of messages",
- _messageDataType,
- _msgAttributeIndex);
- }
- catch (OpenDataException ex)
- {
- _logger.error("OpenDataTypes could not be created.", ex);
- throw new RuntimeException(ex);
- }
+ _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
+ _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
+ _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
+ _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
+ _msgContentAttributes, _msgContentAttributeTypes);
+
+ _msgAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
+ _msgAttributeTypes[2] = SimpleType.LONG; // For size
+ _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
+
+ _messageDataType = new CompositeType("Message","AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
+ _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
}
public String getObjectInstanceName()
@@ -243,12 +205,12 @@ public class AMQQueue implements Managable, Comparable
public Long getMaximumMessageSize()
{
- return _maxAllowedMessageSize;
+ return _maxMessageSize;
}
public void setMaximumMessageSize(Long value)
{
- _maxAllowedMessageSize = value;
+ _maxMessageSize = value;
}
public Integer getConsumerCount()
@@ -268,27 +230,29 @@ public class AMQQueue implements Managable, Comparable
public Integer getMaximumMessageCount()
{
- return _maxAllowedMessageCount;
+ return _maxMessageCount;
}
public void setMaximumMessageCount(Integer value)
{
- _maxAllowedMessageCount = value;
+ _maxMessageCount = value;
}
- public Long getQueueDepth()
+ public Long getMaximumQueueDepth()
{
- return _queueDepth;
+ return _maxQueueDepth;
}
// Sets the queue depth, the max queue size
- public void setQueueDepth(Long value)
+ public void setMaximumQueueDepth(Long value)
{
- _queueDepth = value;
+ _maxQueueDepth = value;
}
- // Returns the size of messages in the queue
- public Long getQueueSize()
+ /**
+ * returns the size of messages(KB) in the queue.
+ */
+ public Long getQueueDepth()
{
List<AMQMessage> list = _deliveryMgr.getMessages();
if (list.size() == 0)
@@ -296,15 +260,17 @@ public class AMQQueue implements Managable, Comparable
return 0l;
}
- long queueSize = 0;
+ long queueDepth = 0;
for (AMQMessage message : list)
{
- queueSize = queueSize + getMessageSize(message);
+ queueDepth = queueDepth + getMessageSize(message);
}
- return new Long(Math.round(queueSize / 100));
+ return (long)Math.round(queueDepth / 1000);
}
- // calculates the size of an AMQMessage
+ /**
+ * returns size of message in bytes
+ */
private long getMessageSize(AMQMessage msg)
{
if (msg == null)
@@ -312,53 +278,43 @@ public class AMQQueue implements Managable, Comparable
return 0l;
}
- List<ContentBody> cBodies = msg.getContentBodies();
- long messageSize = 0;
- for (ContentBody body : cBodies)
- {
- if (body != null)
- {
- messageSize = messageSize + body.getSize();
- }
- }
- return messageSize;
+ return msg.getContentHeaderBody().bodySize;
}
- // Checks if there is any notification to be send to the listeners
+ /**
+ * Checks if there is any notification to be send to the listeners
+ */
private void checkForNotification(AMQMessage msg)
{
- // Check for message count
+ // Check for threshold message count
Integer msgCount = getMessageCount();
if (msgCount >= getMaximumMessageCount())
{
- notifyClients("MessageCount = " + msgCount + ", Queue has reached its size limit and is now full.");
+ notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
}
- // Check for received message size
+ // Check for threshold message size
long messageSize = getMessageSize(msg);
- if (messageSize >= getMaximumMessageSize())
+ if (messageSize >= _maxMessageSize)
{
- notifyClients("MessageSize = " + messageSize + ", Message size (MessageID=" + msg.getMessageId() +
- ")is higher than the threshold value");
+ notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
}
- // Check for queue size in bytes
- long queueSize = getQueueSize();
- if (queueSize >= getQueueDepth())
+ // Check for threshold queue depth in bytes
+ long queueDepth = getQueueDepth();
+ if (queueDepth >= _maxQueueDepth)
{
- notifyClients("QueueSize = " + queueSize + ", Queue size has reached the threshold value");
+ notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
}
}
- // Send the notification to the listeners
+ /**
+ * Sends the notification to the listeners
+ */
private void notifyClients(String notificationMsg)
{
- Notification n = new Notification(
- MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
- this,
- ++_notificationSequenceNumber,
- System.currentTimeMillis(),
- notificationMsg);
+ Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(n);
}
@@ -387,10 +343,12 @@ public class AMQQueue implements Managable, Comparable
}
}
+ /**
+ * returns message content as byte array and related attributes for the given message id.
+ */
public CompositeData viewMessageContent(long msgId) throws JMException
{
List<AMQMessage> list = _deliveryMgr.getMessages();
- CompositeData messageContent = null;
AMQMessage msg = null;
for (AMQMessage message : list)
{
@@ -401,77 +359,55 @@ public class AMQQueue implements Managable, Comparable
}
}
- if (msg != null)
+ if (msg == null)
{
- // get message content
- List<ContentBody> cBodies = msg.getContentBodies();
- List<Byte> msgContent = new ArrayList<Byte>();
- for (ContentBody body : cBodies)
+ throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName );
+ }
+ // get message content
+ List<ContentBody> cBodies = msg.getContentBodies();
+ List<Byte> msgContent = new ArrayList<Byte>();
+ for (ContentBody body : cBodies)
+ {
+ if (body.getSize() != 0)
{
- if (body.getSize() != 0)
+ ByteBuffer slice = body.payload.slice();
+ for (int j = 0; j < slice.limit(); j++)
{
- ByteBuffer slice = body.payload.slice();
- for (int j = 0; j < slice.limit(); j++)
- {
- msgContent.add(slice.get());
- }
+ msgContent.add(slice.get());
}
}
-
- // Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = headerProperties.getContentType();
- String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
-
- Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
- messageContent = new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
- }
- else
- {
- throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
- return messageContent;
+ // Create header attributes list
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties;
+ String mimeType = headerProperties.getContentType();
+ String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
+ Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
+
+ return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
}
/**
- * Returns the messages stored in this queue in tabular form.
- *
- * @param beginIndex
- * @param endIndex
- * @return AMQ messages in tabular form.
- * @throws JMException
+ * Returns the header contents of the messages stored in this queue in tabular form.
*/
public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
{
if ((beginIndex > endIndex) || (beginIndex < 1))
{
- throw new JMException("FromIndex = " + beginIndex + ", ToIndex = " + endIndex +
- "\nFromIndex should be greater than 0 and less than ToIndex");
+ throw new JMException("From Index = " + beginIndex + ", To Index = " + endIndex +
+ "\nFrom Index should be greater than 0 and less than To Index");
}
List<AMQMessage> list = _deliveryMgr.getMessages();
TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
- if (beginIndex > list.size())
- {
- return _messageList;
- }
- endIndex = endIndex < list.size() ? endIndex : list.size();
-
- for (int i = beginIndex; i <= endIndex; i++)
+ // Create the tabular list of message header contents
+ for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
{
AMQMessage msg = list.get(i - 1);
- long size = 0;
- // get message content
- List<ContentBody> cBodies = msg.getContentBodies();
- for (ContentBody body : cBodies)
- {
- size = size + body.getSize();
- }
-
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)headerBody.properties;
List<String> headerAttribsList = new ArrayList<String>();
headerAttribsList.add("App Id=" + headerProperties.getAppId());
headerAttribsList.add("MimeType=" + headerProperties.getContentType());
@@ -479,13 +415,9 @@ public class AMQQueue implements Managable, Comparable
headerAttribsList.add("Encoding=" + headerProperties.getEncoding());
headerAttribsList.add(headerProperties.toString());
- Object[] itemValues = {msg.getMessageId(),
- headerAttribsList.toArray(new String[0]),
- size, msg.isRedelivered()};
-
- CompositeData messageData = new CompositeDataSupport(_messageDataType,
- _msgAttributeNames,
- itemValues);
+ Object[] itemValues = {msg.getMessageId(), headerAttribsList.toArray(new String[0]),
+ headerBody.bodySize, msg.isRedelivered()};
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
_messageList.put(messageData);
}
@@ -493,20 +425,15 @@ public class AMQQueue implements Managable, Comparable
}
/**
- * Creates all the notifications this MBean can send.
- *
- * @return Notifications broadcasted by this MBean.
+ * returns Notifications sent by this MBean.
*/
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
- String[] notificationTypes = new String[]
- {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
String name = MonitorNotification.class.getName();
- String description = "An attribute of this MBean has reached threshold value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes,
- name,
- description);
+ String description = "Either Message count or Queue depth or Message size has reached threshold high value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
return new MBeanNotificationInfo[]{info1};
}
@@ -608,9 +535,9 @@ public class AMQQueue implements Managable, Comparable
{
return new AMQQueueMBean();
}
- catch (NotCompliantMBeanException ex)
+ catch (JMException ex)
{
- throw new AMQException("AMQQueue MBean creation has failed.", ex);
+ throw new AMQException("AMQQueue MBean creation has failed ", ex);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
index 3a818cf31a..de5d0f55a7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
@@ -45,16 +45,48 @@ public interface ManagedQueue
* @return the name of the managedQueue.
* @throws IOException
*/
- @MBeanAttribute(name="Name", description = "Name of the " + TYPE)
+ @MBeanAttribute(name="Name", description = TYPE + " Name")
String getName() throws IOException;
/**
- * Tells whether this ManagedQueue is durable or not.
- * @return true if this ManagedQueue is a durable queue.
+ * Total number of messages on the queue, which are yet to be delivered to the consumer(s).
+ * @return number of undelivered message in the Queue.
* @throws IOException
*/
- @MBeanAttribute(name="Durable", description = "true if the AMQQueue is durable")
- boolean isDurable() throws IOException;
+ @MBeanAttribute(name="MessageCount", description = "Total number of undelivered messages on the queue")
+ Integer getMessageCount() throws IOException;
+
+ /**
+ * Tells the total number of messages receieved by the queue since startup.
+ * @return total number of messages received.
+ * @throws IOException
+ */
+ @MBeanAttribute(name="ReceivedMessageCount", description="The total number of messages receieved by the queue since startup")
+ Long getReceivedMessageCount() throws IOException;
+
+ /**
+ * Size of messages in the queue
+ * @return
+ * @throws IOException
+ */
+ @MBeanAttribute(name="QueueDepth", description="Size of messages(KB) in the queue")
+ Long getQueueDepth() throws IOException;
+
+ /**
+ * Returns the total number of active subscribers to the queue.
+ * @return the number of active subscribers
+ * @throws IOException
+ */
+ @MBeanAttribute(name="ActiveConsumerCount", description="The total number of active subscribers to the queue")
+ Integer getActiveConsumerCount() throws IOException;
+
+ /**
+ * Returns the total number of subscribers to the queue.
+ * @return the number of subscribers.
+ * @throws IOException
+ */
+ @MBeanAttribute(name="ConsumerCount", description="The total number of subscribers to the queue")
+ Integer getConsumerCount() throws IOException;
/**
* Tells the Owner of the ManagedQueue.
@@ -65,21 +97,20 @@ public interface ManagedQueue
String getOwner() throws IOException;
/**
- * Tells if the ManagedQueue is set to AutoDelete.
- * @return true if the ManagedQueue is set to AutoDelete.
+ * Tells whether this ManagedQueue is durable or not.
+ * @return true if this ManagedQueue is a durable queue.
* @throws IOException
*/
- @MBeanAttribute(name="AutoDelete", description = "true if the AMQQueue is AutoDelete")
- boolean isAutoDelete() throws IOException;
+ @MBeanAttribute(name="Durable", description = "true if the AMQQueue is durable")
+ boolean isDurable() throws IOException;
/**
- * Total number of messages on the queue, which are yet to be delivered to the consumer(s).
- * @return number of undelivered message in the Queue.
+ * Tells if the ManagedQueue is set to AutoDelete.
+ * @return true if the ManagedQueue is set to AutoDelete.
* @throws IOException
*/
- @MBeanAttribute(name="MessageCount",
- description = "Total number of undelivered messages on the queue")
- Integer getMessageCount() throws IOException;
+ @MBeanAttribute(name="AutoDelete", description = "true if the AMQQueue is AutoDelete")
+ boolean isAutoDelete() throws IOException;
/**
* Returns the maximum size of a message (in kbytes) allowed to be accepted by the
@@ -98,36 +129,10 @@ public interface ManagedQueue
* @param size maximum size of message.
* @throws IOException
*/
- @MBeanAttribute(name="MaximumMessageSize",
- description="Maximum size(KB) of a message allowed for this Queue")
+ @MBeanAttribute(name="MaximumMessageSize", description="Threshold high value(KB) for a message size")
void setMaximumMessageSize(Long size) throws IOException;
/**
- * Returns the total number of subscribers to the queue.
- * @return the number of subscribers.
- * @throws IOException
- */
- @MBeanAttribute(name="ConsumerCount", description="The total number of subscribers to the queue")
- Integer getConsumerCount() throws IOException;
-
- /**
- * Returns the total number of active subscribers to the queue.
- * @return the number of active subscribers
- * @throws IOException
- */
- @MBeanAttribute(name="ActiveConsumerCount", description="The total number of active subscribers to the queue")
- Integer getActiveConsumerCount() throws IOException;
-
- /**
- * Tells the total number of messages receieved by the queue since startup.
- * @return total number of messages received.
- * @throws IOException
- */
- @MBeanAttribute(name="ReceivedMessageCount",
- description="The total number of messages receieved by the queue since startup")
- Long getReceivedMessageCount() throws IOException;
-
- /**
* Tells the maximum number of messages that can be stored in the queue.
* This is useful in setting the notifications or taking required
* action is the number of message increase this limit.
@@ -141,27 +146,16 @@ public interface ManagedQueue
* @param value the maximum number of messages allowed to be stored in the queue.
* @throws IOException
*/
- @MBeanAttribute(name="MaximumMessageCount",
- description="The maximum number of messages allowed to be stored in the queue")
+ @MBeanAttribute(name="MaximumMessageCount", description="Threshold high value for number of undelivered messages in the queue")
void setMaximumMessageCount(Integer value) throws IOException;
/**
- * Size of messages in the queue
- * @return
- * @throws IOException
- */
- @MBeanAttribute(name="QueueSize", description="Size of messages(KB) in the queue")
- Long getQueueSize() throws IOException;
-
- /**
- * Tells the maximum size of all the messages combined together,
- * that can be stored in the queue. This is useful for setting notifications
- * or taking required action if the size of messages stored in the queue
- * increases over this limit.
- * @return maximum size of the all the messages allowed for the queue.
+ * This is useful for setting notifications or taking required action if the size of messages
+ * stored in the queue increases over this limit.
+ * @return threshold high value for Queue Depth
* @throws IOException
*/
- Long getQueueDepth() throws IOException;
+ Long getMaximumQueueDepth() throws IOException;
/**
* Sets the maximum size of all the messages together, that can be stored
@@ -169,9 +163,8 @@ public interface ManagedQueue
* @param value
* @throws IOException
*/
- @MBeanAttribute(name="QueueDepth",
- description="The size(KB) of all the messages together, that can be stored in the queue")
- void setQueueDepth(Long value) throws IOException;
+ @MBeanAttribute(name="MaximumQueueDepth", description="The threshold high value(KB) for Queue Depth")
+ void setMaximumQueueDepth(Long value) throws IOException;
@@ -188,19 +181,22 @@ public interface ManagedQueue
* @throws JMException
*/
@MBeanOperation(name="viewMessages",
- description="shows messages in this queue with given indexes. eg. from index 1 - 100")
+ description="Message headers for messages in this queue within given index range. eg. from index 1 - 100")
TabularData viewMessages(@MBeanOperationParameter(name="from index", description="from index")int fromIndex,
@MBeanOperationParameter(name="to index", description="to index")int toIndex)
throws IOException, JMException;
+ @MBeanOperation(name="viewMessageContent", description="The message content for given Message Id")
+ CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId)
+ throws IOException, JMException;
+
/**
* Deletes the first message from top.
* @throws IOException
* @throws JMException
*/
- @MBeanOperation(name="deleteMessageFromTop",
- description="Deletes the first message from top",
- impact= MBeanOperationInfo.ACTION)
+ @MBeanOperation(name="deleteMessageFromTop", description="Deletes the first message from top",
+ impact= MBeanOperationInfo.ACTION)
void deleteMessageFromTop() throws IOException, JMException;
/**
@@ -209,12 +205,8 @@ public interface ManagedQueue
* @throws JMException
*/
@MBeanOperation(name="clearQueue",
- description="Clears the queue by deleting all the undelivered messages from the queue",
- impact= MBeanOperationInfo.ACTION)
+ description="Clears the queue by deleting all the undelivered messages from the queue",
+ impact= MBeanOperationInfo.ACTION)
void clearQueue() throws IOException, JMException;
- @MBeanOperation(name="viewMessageContent",
- description="Returns the message content along with MimeType and Encoding")
- CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId)
- throws IOException, JMException;
}