summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java261
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java240
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java429
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java343
4 files changed, 734 insertions, 539 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 1aa62dbfa4..407ad236ea 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -40,9 +40,6 @@ import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -50,24 +47,12 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import javax.management.JMException;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.Notification;
-import javax.management.monitor.MonitorNotification;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
import javax.security.sasl.SaslServer;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.Date;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -93,7 +78,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private AMQCodecFactory _codecFactory;
- private ManagedAMQProtocolSession _managedObject;
+ private AMQProtocolSessionMBean _managedObject;
private SaslServer _saslServer;
@@ -102,11 +87,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private Object _lastSent;
private boolean _closed;
-
+ // maximum number of channels this session should have
private long _maxNoOfChannels = 1000;
/* AMQP Version for this session */
-
private byte _major;
private byte _minor;
@@ -115,190 +99,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return _managedObject;
}
- /**
- * This class implements the management interface (is an MBean). In order to
- * make more attributes, operations and notifications available over JMX simply
- * augment the ManagedConnection interface and add the appropriate implementation here.
- */
- @MBeanDescription("Management Bean for an AMQ Broker Connection")
- private final class ManagedAMQProtocolSession extends AMQManagedObject implements ManagedConnection
- {
- private String _name = null;
- //openmbean data types for representing the channel attributes
- private String[] _channelAtttibuteNames = { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
- private String[] _indexNames = {_channelAtttibuteNames[0]};
- private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
- private CompositeType _channelType = null; // represents the data type for channel data
- private TabularType _channelsType = null; // Data type for list of channels type
- private TabularDataSupport _channelsList = null;
-
- @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
- public ManagedAMQProtocolSession() throws JMException
- {
- super(ManagedConnection.class, ManagedConnection.TYPE);
- init();
- }
-
- /**
- * initialises the openmbean data types
- */
- private void init() throws OpenDataException
- {
- String remote = getRemoteAddress();
- remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
- _name = jmxEncode(new StringBuffer(remote), 0).toString();
- _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
- _channelAtttibuteNames, _channelAttributeTypes);
- _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
- }
-
- public Date getLastIoTime()
- {
- return new Date(_minaProtocolSession.getLastIoTime());
- }
-
- public String getRemoteAddress()
- {
- return _minaProtocolSession.getRemoteAddress().toString();
- }
-
- public Long getWrittenBytes()
- {
- return _minaProtocolSession.getWrittenBytes();
- }
-
- public Long getReadBytes()
- {
- return _minaProtocolSession.getReadBytes();
- }
-
- public Long getMaximumNumberOfChannels()
- {
- return _maxNoOfChannels;
- }
-
- public void setMaximumNumberOfChannels(Long value)
- {
- _maxNoOfChannels = value;
- }
-
- public String getObjectInstanceName()
- {
- return _name;
- }
-
- public void commitTransactions(int channelId) throws JMException
- {
- try
- {
- AMQChannel channel = _channelMap.get(channelId);
- if (channel == null)
- {
- throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
- }
- if (channel.isTransactional())
- {
- channel.commit();
- }
- }
- catch(AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- public void rollbackTransactions(int channelId) throws JMException
- {
- try
- {
- AMQChannel channel = _channelMap.get(channelId);
- if (channel == null)
- {
- throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
- }
- if (channel.isTransactional())
- {
- channel.rollback();
- }
- }
- catch(AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
- * Creates the list of channels in tabular form from the _channelMap.
- * @return list of channels in tabular form.
- * @throws OpenDataException
- */
- public TabularData channels() throws OpenDataException
- {
- _channelsList = new TabularDataSupport(_channelsType);
-
- for (Map.Entry<Integer, AMQChannel> entry : _channelMap.entrySet())
- {
- AMQChannel channel = entry.getValue();
- Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
- (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null,
- channel.getUnacknowledgedMessageMap().size()};
-
- CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
- _channelsList.put(channelData);
- }
-
- return _channelsList;
- }
-
- public void closeChannel(int id) throws Exception
- {
- try
- {
- AMQMinaProtocolSession.this.closeChannel(id);
- }
- catch (AMQException ex)
- {
- throw new Exception(ex.toString());
- }
- }
-
- public void closeConnection() throws Exception
- {
- try
- {
- AMQMinaProtocolSession.this.closeSession();
- }
- catch (AMQException ex)
- {
- throw new Exception(ex.toString());
- }
- }
-
- @Override
- public MBeanNotificationInfo[] getNotificationInfo()
- {
- String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
- String name = MonitorNotification.class.getName();
- String description = "Channel count has reached threshold value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
-
- return new MBeanNotificationInfo[] {info1};
- }
-
- private void checkForNotification()
- {
- int channelsCount = _channelMap.size();
- if (channelsCount >= getMaximumNumberOfChannels())
- {
- Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(),
- "Channel count (" + channelsCount + ") has reached the threshold value");
-
- _broadcaster.sendNotification(n);
- }
- }
-
- } // End of MBean class
public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
AMQCodecFactory codecFactory)
@@ -322,11 +122,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_managedObject.register();
}
- private ManagedAMQProtocolSession createMBean() throws AMQException
+ private AMQProtocolSessionMBean createMBean() throws AMQException
{
try
{
- return new ManagedAMQProtocolSession();
+ return new AMQProtocolSessionMBean(this);
}
catch(JMException ex)
{
@@ -335,6 +135,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
+ public IoSession getIOSession()
+ {
+ return _minaProtocolSession;
+ }
+
public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession)
{
return (AMQProtocolSession) minaProtocolSession.getAttachment();
@@ -495,7 +300,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_contextKey = contextKey;
}
- public AMQChannel getChannel(int channelId) throws AMQException
+ public List<AMQChannel> getChannels()
+ {
+ return new ArrayList<AMQChannel>(_channelMap.values());
+ }
+
+ public AMQChannel getChannel(int channelId)
{
return _channelMap.get(channelId);
}
@@ -503,7 +313,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public void addChannel(AMQChannel channel)
{
_channelMap.put(channel.getChannelId(), channel);
- _managedObject.checkForNotification();
+ checkForNotification();
+ }
+
+ private void checkForNotification()
+ {
+ int channelsCount = _channelMap.size();
+ if (channelsCount >= _maxNoOfChannels)
+ {
+ _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
+ }
+ }
+
+ public Long getMaximumNumberOfChannels()
+ {
+ return _maxNoOfChannels;
+ }
+
+ public void setMaximumNumberOfChannels(Long value)
+ {
+ _maxNoOfChannels = value;
+ }
+
+ public void commitTransactions(AMQChannel channel) throws AMQException
+ {
+ if (channel != null && channel.isTransactional())
+ {
+ channel.commit();
+ }
+ }
+
+ public void rollbackTransactions(AMQChannel channel) throws AMQException
+ {
+ if (channel != null && channel.isTransactional())
+ {
+ channel.rollback();
+ }
}
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
new file mode 100644
index 0000000000..b56d22d655
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -0,0 +1,240 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.Notification;
+import javax.management.monitor.MonitorNotification;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * This MBean class implements the management interface. In order to make more attributes, operations and notifications
+ * available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here.
+ */
+@MBeanDescription("Management Bean for an AMQ Broker Connection")
+public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
+{
+ private AMQMinaProtocolSession _session = null;
+ private String _name = null;
+ //openmbean data types for representing the channel attributes
+ private String[] _channelAtttibuteNames = {"Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
+ private String[] _indexNames = {_channelAtttibuteNames[0]};
+ private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
+ private CompositeType _channelType = null; // represents the data type for channel data
+ private TabularType _channelsType = null; // Data type for list of channels type
+
+ @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
+ public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws JMException
+ {
+ super(ManagedConnection.class, ManagedConnection.TYPE);
+ _session = session;
+ init();
+ }
+
+ /**
+ * initialises the openmbean data types
+ */
+ private void init() throws OpenDataException
+ {
+ String remote = getRemoteAddress();
+ remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
+ _name = jmxEncode(new StringBuffer(remote), 0).toString();
+ _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
+ _channelAtttibuteNames, _channelAttributeTypes);
+ _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
+ }
+
+ public Date getLastIoTime()
+ {
+ return new Date(_session.getIOSession().getLastIoTime());
+ }
+
+ public String getRemoteAddress()
+ {
+ return _session.getIOSession().getRemoteAddress().toString();
+ }
+
+ public Long getWrittenBytes()
+ {
+ return _session.getIOSession().getWrittenBytes();
+ }
+
+ public Long getReadBytes()
+ {
+ return _session.getIOSession().getReadBytes();
+ }
+
+ public Long getMaximumNumberOfChannels()
+ {
+ return _session.getMaximumNumberOfChannels();
+ }
+
+ public void setMaximumNumberOfChannels(Long value)
+ {
+ _session.setMaximumNumberOfChannels(value);
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _name;
+ }
+
+ /**
+ * commits transactions for a transactional channel
+ *
+ * @param channelId
+ * @throws JMException if channel with given id doesn't exist or if commit fails
+ */
+ public void commitTransactions(int channelId) throws JMException
+ {
+ try
+ {
+ AMQChannel channel = _session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
+ }
+ _session.commitTransactions(channel);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * rollsback the transactions for a transactional channel
+ *
+ * @param channelId
+ * @throws JMException if channel with given id doesn't exist or if rollback fails
+ */
+ public void rollbackTransactions(int channelId) throws JMException
+ {
+ try
+ {
+ AMQChannel channel = _session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
+ }
+ _session.rollbackTransactions(channel);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * Creates the list of channels in tabular form from the _channelMap.
+ *
+ * @return list of channels in tabular form.
+ * @throws OpenDataException
+ */
+ public TabularData channels() throws OpenDataException
+ {
+ TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
+ List<AMQChannel> list = _session.getChannels();
+
+ for (AMQChannel channel : list)
+ {
+ Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
+ (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null,
+ channel.getUnacknowledgedMessageMap().size()};
+
+ CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
+ channelsList.put(channelData);
+ }
+
+ return channelsList;
+ }
+
+ /**
+ * @see AMQMinaProtocolSession#closeChannel(int)
+ */
+ public void closeChannel(int id) throws JMException
+ {
+ AMQChannel channel = _session.getChannel(id);
+ if (channel == null)
+ {
+ throw new JMException("The channel (channel Id = " + id + ") does not exist");
+ }
+ try
+ {
+ _session.closeChannel(id);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * closes the connection. The administrator can use this management operation to close connection to free up
+ * resources.
+ * @throws JMException
+ */
+ public void closeConnection() throws JMException
+ {
+ try
+ {
+ _session.closeSession();
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ @Override
+ public MBeanNotificationInfo[] getNotificationInfo()
+ {
+ String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String name = MonitorNotification.class.getName();
+ String description = "Channel count has reached threshold value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
+
+ return new MBeanNotificationInfo[]{info1};
+ }
+
+ public void notifyClients(String notificationMsg)
+ {
+ Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+ _broadcaster.sendNotification(n);
+ }
+
+} // End of MBean class
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 353a2007c0..f2ef97cf9a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -21,15 +21,8 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -37,13 +30,7 @@ import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.txn.TxnOp;
import javax.management.JMException;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.Notification;
-import javax.management.monitor.MonitorNotification;
-import javax.management.openmbean.*;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
@@ -108,7 +95,7 @@ public class AMQQueue implements Managable, Comparable
* max allowed number of messages on a queue.
*/
private Integer _maxMessageCount = 10000;
-
+
/**
* max queue depth(KB) for the queue
*/
@@ -124,322 +111,6 @@ public class AMQQueue implements Managable, Comparable
return _name.compareTo(((AMQQueue) o).getName());
}
- /**
- * MBean class for AMQQueue. It implements all the management features exposed
- * for an AMQQueue.
- */
- @MBeanDescription("Management Interface for AMQQueue")
- private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
- {
- private String _queueName = null;
- // OpenMBean data types for viewMessages method
- private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"};
- private String[] _msgAttributeIndex = {_msgAttributeNames[0]};
- private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
- private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
- private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
-
- // OpenMBean data types for viewMessageContent method
- private CompositeType _msgContentType = null;
- private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
- private OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
- @MBeanConstructor("Creates an MBean exposing an AMQQueue")
- public AMQQueueMBean() throws JMException
- {
- super(ManagedQueue.class, ManagedQueue.TYPE);
- init();
- }
-
- /**
- * initialises the openmbean data types
- */
- private void init() throws OpenDataException
- {
- _queueName = jmxEncode(new StringBuffer(_name), 0).toString();
- _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
- _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
- _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
- _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
- _msgContentAttributes, _msgContentAttributeTypes);
-
- _msgAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
- _msgAttributeTypes[2] = SimpleType.LONG; // For size
- _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
-
- _messageDataType = new CompositeType("Message","AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
- _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
- }
-
- public String getObjectInstanceName()
- {
- return _queueName;
- }
-
- public String getName()
- {
- return _name;
- }
-
- public boolean isDurable()
- {
- return _durable;
- }
-
- public String getOwner()
- {
- return _owner;
- }
-
- public boolean isAutoDelete()
- {
- return _autoDelete;
- }
-
- public Integer getMessageCount()
- {
- return _deliveryMgr.getQueueMessageCount();
- }
-
- public Long getMaximumMessageSize()
- {
- return _maxMessageSize;
- }
-
- public void setMaximumMessageSize(Long value)
- {
- _maxMessageSize = value;
- }
-
- public Integer getConsumerCount()
- {
- return _subscribers.size();
- }
-
- public Integer getActiveConsumerCount()
- {
- return _subscribers.getWeight();
- }
-
- public Long getReceivedMessageCount()
- {
- return _totalMessagesReceived;
- }
-
- public Integer getMaximumMessageCount()
- {
- return _maxMessageCount;
- }
-
- public void setMaximumMessageCount(Integer value)
- {
- _maxMessageCount = value;
- }
-
- public Long getMaximumQueueDepth()
- {
- return _maxQueueDepth;
- }
-
- // Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(Long value)
- {
- _maxQueueDepth = value;
- }
-
- /**
- * returns the size of messages(KB) in the queue.
- */
- public Long getQueueDepth()
- {
- List<AMQMessage> list = _deliveryMgr.getMessages();
- if (list.size() == 0)
- {
- return 0l;
- }
-
- long queueDepth = 0;
- for (AMQMessage message : list)
- {
- queueDepth = queueDepth + getMessageSize(message);
- }
- return (long)Math.round(queueDepth / 1000);
- }
-
- /**
- * returns size of message in bytes
- */
- private long getMessageSize(AMQMessage msg)
- {
- if (msg == null)
- {
- return 0l;
- }
-
- return msg.getContentHeaderBody().bodySize;
- }
-
- /**
- * Checks if there is any notification to be send to the listeners
- */
- private void checkForNotification(AMQMessage msg)
- {
- // Check for threshold message count
- Integer msgCount = getMessageCount();
- if (msgCount >= getMaximumMessageCount())
- {
- notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
- }
-
- // Check for threshold message size
- long messageSize = getMessageSize(msg);
- if (messageSize >= _maxMessageSize)
- {
- notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
- }
-
- // Check for threshold queue depth in bytes
- long queueDepth = getQueueDepth();
- if (queueDepth >= _maxQueueDepth)
- {
- notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
- }
- }
-
- /**
- * Sends the notification to the listeners
- */
- private void notifyClients(String notificationMsg)
- {
- Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
-
- _broadcaster.sendNotification(n);
- }
-
- public void deleteMessageFromTop() throws JMException
- {
- try
- {
- _deliveryMgr.removeAMessageFromTop();
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- public void clearQueue() throws JMException
- {
- try
- {
- _deliveryMgr.clearAllMessages();
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
- * returns message content as byte array and related attributes for the given message id.
- */
- public CompositeData viewMessageContent(long msgId) throws JMException
- {
- List<AMQMessage> list = _deliveryMgr.getMessages();
- AMQMessage msg = null;
- for (AMQMessage message : list)
- {
- if (message.getMessageId() == msgId)
- {
- msg = message;
- break;
- }
- }
-
- if (msg == null)
- {
- throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName );
- }
- // get message content
- List<ContentBody> cBodies = msg.getContentBodies();
- List<Byte> msgContent = new ArrayList<Byte>();
- for (ContentBody body : cBodies)
- {
- if (body.getSize() != 0)
- {
- ByteBuffer slice = body.payload.slice();
- for (int j = 0; j < slice.limit(); j++)
- {
- msgContent.add(slice.get());
- }
- }
- }
-
- // Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties;
- String mimeType = headerProperties.getContentType();
- String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
- Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
-
- return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
- }
-
- /**
- * Returns the header contents of the messages stored in this queue in tabular form.
- */
- public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
- {
- if ((beginIndex > endIndex) || (beginIndex < 1))
- {
- throw new JMException("From Index = " + beginIndex + ", To Index = " + endIndex +
- "\nFrom Index should be greater than 0 and less than To Index");
- }
-
- List<AMQMessage> list = _deliveryMgr.getMessages();
- TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
-
- // Create the tabular list of message header contents
- for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
- {
- AMQMessage msg = list.get(i - 1);
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
- // Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)headerBody.properties;
- List<String> headerAttribsList = new ArrayList<String>();
- headerAttribsList.add("App Id=" + headerProperties.getAppId());
- headerAttribsList.add("MimeType=" + headerProperties.getContentType());
- headerAttribsList.add("Correlation Id=" + headerProperties.getCorrelationId());
- headerAttribsList.add("Encoding=" + headerProperties.getEncoding());
- headerAttribsList.add(headerProperties.toString());
-
- Object[] itemValues = {msg.getMessageId(), headerAttribsList.toArray(new String[0]),
- headerBody.bodySize, msg.isRedelivered()};
- CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
- _messageList.put(messageData);
- }
-
- return _messageList;
- }
-
- /**
- * returns Notifications sent by this MBean.
- */
- @Override
- public MBeanNotificationInfo[] getNotificationInfo()
- {
- String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
- String name = MonitorNotification.class.getName();
- String description = "Either Message count or Queue depth or Message size has reached threshold high value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
-
- return new MBeanNotificationInfo[]{info1};
- }
-
- } // End of AMQMBean class
-
public AMQQueue(String name, boolean durable, String owner,
boolean autoDelete, QueueRegistry queueRegistry)
throws AMQException
@@ -533,7 +204,7 @@ public class AMQQueue implements Managable, Comparable
{
try
{
- return new AMQQueueMBean();
+ return new AMQQueueMBean(this);
}
catch (JMException ex)
{
@@ -566,16 +237,112 @@ public class AMQQueue implements Managable, Comparable
return _autoDelete;
}
+ /**
+ * @return no of messages(undelivered) on the queue.
+ */
public int getMessageCount()
{
return _deliveryMgr.getQueueMessageCount();
}
+ /**
+ * @return List of messages(undelivered) on the queue.
+ */
+ public List<AMQMessage> getMessagesOnTheQueue()
+ {
+ return _deliveryMgr.getMessages();
+ }
+
+ /**
+ * @param messageId
+ * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
+ */
+ public AMQMessage getMessageOnTheQueue(long messageId)
+ {
+ List<AMQMessage> list = getMessagesOnTheQueue();
+ AMQMessage msg = null;
+ for (AMQMessage message : list)
+ {
+ if (message.getMessageId() == messageId)
+ {
+ msg = message;
+ break;
+ }
+ }
+
+ return msg;
+ }
+
+ /**
+ * @return MBean object associated with this Queue
+ */
public ManagedObject getManagedObject()
{
return _managedObject;
}
+ public Long getMaximumMessageSize()
+ {
+ return _maxMessageSize;
+ }
+
+ public void setMaximumMessageSize(Long value)
+ {
+ _maxMessageSize = value;
+ }
+
+ public Integer getConsumerCount()
+ {
+ return _subscribers.size();
+ }
+
+ public Integer getActiveConsumerCount()
+ {
+ return _subscribers.getWeight();
+ }
+
+ public Long getReceivedMessageCount()
+ {
+ return _totalMessagesReceived;
+ }
+
+ public Integer getMaximumMessageCount()
+ {
+ return _maxMessageCount;
+ }
+
+ public void setMaximumMessageCount(Integer value)
+ {
+ _maxMessageCount = value;
+ }
+
+ public Long getMaximumQueueDepth()
+ {
+ return _maxQueueDepth;
+ }
+
+ // Sets the queue depth, the max queue size
+ public void setMaximumQueueDepth(Long value)
+ {
+ _maxQueueDepth = value;
+ }
+
+ /**
+ * Removes the AMQMessage from the top of the queue.
+ */
+ public void deleteMessageFromTop() throws AMQException
+ {
+ _deliveryMgr.removeAMessageFromTop();
+ }
+
+ /**
+ * removes all the messages from the queue.
+ */
+ public void clearQueue() throws AMQException
+ {
+ _deliveryMgr.clearAllMessages();
+ }
+
public void bind(String routingKey, Exchange exchange)
{
_bindings.addBinding(routingKey, exchange);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
new file mode 100644
index 0000000000..54dd366d71
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -0,0 +1,343 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.management.openmbean.*;
+import javax.management.JMException;
+import javax.management.Notification;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.OperationsException;
+import javax.management.monitor.MonitorNotification;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * MBean class for AMQQueue. It implements all the management features exposed
+ * for an AMQQueue.
+ */
+@MBeanDescription("Management Interface for AMQQueue")
+public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
+{
+ private AMQQueue _queue = null;
+ private String _queueName = null;
+ // OpenMBean data types for viewMessages method
+ private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"};
+ private String[] _msgAttributeIndex = {_msgAttributeNames[0]};
+ private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
+ private CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
+ private TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
+
+ // OpenMBean data types for viewMessageContent method
+ private CompositeType _msgContentType = null;
+ private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
+ private OpenType[] _msgContentAttributeTypes = new OpenType[4];
+
+ @MBeanConstructor("Creates an MBean exposing an AMQQueue")
+ public AMQQueueMBean(AMQQueue queue) throws JMException
+ {
+ super(ManagedQueue.class, ManagedQueue.TYPE);
+ _queue = queue;
+ _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
+ init();
+ }
+
+ /**
+ * initialises the openmbean data types
+ */
+ private void init() throws OpenDataException
+ {
+ _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
+ _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
+ _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
+ _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
+ _msgContentAttributes, _msgContentAttributeTypes);
+
+ _msgAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
+ _msgAttributeTypes[2] = SimpleType.LONG; // For size
+ _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
+
+ _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
+ _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
+ }
+
+ public String getObjectInstanceName()
+ {
+ return _queueName;
+ }
+
+ public String getName()
+ {
+ return _queueName;
+ }
+
+ public boolean isDurable()
+ {
+ return _queue.isDurable();
+ }
+
+ public String getOwner()
+ {
+ return _queue.getOwner();
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _queue.isAutoDelete();
+ }
+
+ public Integer getMessageCount()
+ {
+ return _queue.getMessageCount();
+ }
+
+ public Long getMaximumMessageSize()
+ {
+ return _queue.getMaximumMessageSize();
+ }
+
+ public void setMaximumMessageSize(Long value)
+ {
+ _queue.setMaximumMessageSize(value);
+ }
+
+ public Integer getConsumerCount()
+ {
+ return _queue.getConsumerCount();
+ }
+
+ public Integer getActiveConsumerCount()
+ {
+ return _queue.getActiveConsumerCount();
+ }
+
+ public Long getReceivedMessageCount()
+ {
+ return _queue.getReceivedMessageCount();
+ }
+
+ public Integer getMaximumMessageCount()
+ {
+ return _queue.getMaximumMessageCount();
+ }
+
+ public void setMaximumMessageCount(Integer value)
+ {
+ _queue.setMaximumMessageCount(value);
+ }
+
+ public Long getMaximumQueueDepth()
+ {
+ return _queue.getMaximumQueueDepth();
+ }
+
+ public void setMaximumQueueDepth(Long value)
+ {
+ _queue.setMaximumQueueDepth(value);
+ }
+
+ /**
+ * returns the size of messages(KB) in the queue.
+ */
+ public Long getQueueDepth()
+ {
+ List<AMQMessage> list = _queue.getMessagesOnTheQueue();
+ if (list.size() == 0)
+ {
+ return 0l;
+ }
+
+ long queueDepth = 0;
+ for (AMQMessage message : list)
+ {
+ queueDepth = queueDepth + getMessageSize(message);
+ }
+ return (long) Math.round(queueDepth / 1000);
+ }
+
+ /**
+ * returns size of message in bytes
+ */
+ private long getMessageSize(AMQMessage msg)
+ {
+ if (msg == null)
+ {
+ return 0l;
+ }
+
+ return msg.getContentHeaderBody().bodySize;
+ }
+
+ /**
+ * Checks if there is any notification to be send to the listeners
+ */
+ public void checkForNotification(AMQMessage msg)
+ {
+ // Check for threshold message count
+ Integer msgCount = getMessageCount();
+ if (msgCount >= getMaximumMessageCount())
+ {
+ notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
+ }
+
+ // Check for threshold message size
+ long messageSize = getMessageSize(msg);
+ if (messageSize >= _queue.getMaximumMessageSize())
+ {
+ notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
+ }
+
+ // Check for threshold queue depth in bytes
+ long queueDepth = getQueueDepth();
+ if (queueDepth >= _queue.getMaximumQueueDepth())
+ {
+ notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
+ }
+ }
+
+ /**
+ * Sends the notification to the listeners
+ */
+ private void notifyClients(String notificationMsg)
+ {
+ Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+
+ _broadcaster.sendNotification(n);
+ }
+
+ /**
+ * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop()
+ */
+ public void deleteMessageFromTop() throws JMException
+ {
+ try
+ {
+ _queue.deleteMessageFromTop();
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * @see org.apache.qpid.server.queue.AMQQueue#clearQueue()
+ */
+ public void clearQueue() throws JMException
+ {
+ try
+ {
+ _queue.clearQueue();
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ /**
+ * returns message content as byte array and related attributes for the given message id.
+ */
+ public CompositeData viewMessageContent(long msgId) throws JMException
+ {
+ AMQMessage msg = _queue.getMessageOnTheQueue(msgId);
+ if (msg == null)
+ {
+ throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
+ }
+ // get message content
+ List<ContentBody> cBodies = msg.getContentBodies();
+ List<Byte> msgContent = new ArrayList<Byte>();
+ for (ContentBody body : cBodies)
+ {
+ if (body.getSize() != 0)
+ {
+ ByteBuffer slice = body.payload.slice();
+ for (int j = 0; j < slice.limit(); j++)
+ {
+ msgContent.add(slice.get());
+ }
+ }
+ }
+
+ // Create header attributes list
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
+ String mimeType = headerProperties.getContentType();
+ String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
+ Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
+
+ return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
+ }
+
+ /**
+ * Returns the header contents of the messages stored in this queue in tabular form.
+ */
+ public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
+ {
+ if ((beginIndex > endIndex) || (beginIndex < 1))
+ {
+ throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex +
+ "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
+ }
+
+ List<AMQMessage> list = _queue.getMessagesOnTheQueue();
+ TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
+
+ // Create the tabular list of message header contents
+ for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
+ {
+ AMQMessage msg = list.get(i - 1);
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ // Create header attributes list
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+ String[] headerAttributes = headerProperties.toString().split(",");
+ Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()};
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
+ _messageList.put(messageData);
+ }
+
+ return _messageList;
+ }
+
+ /**
+ * returns Notifications sent by this MBean.
+ */
+ @Override
+ public MBeanNotificationInfo[] getNotificationInfo()
+ {
+ String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String name = MonitorNotification.class.getName();
+ String description = "Either Message count or Queue depth or Message size has reached threshold high value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
+
+ return new MBeanNotificationInfo[]{info1};
+ }
+
+} // End of AMQMBean class