summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/AMQQueue.java132
1 files changed, 71 insertions, 61 deletions
diff --git a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
index 7fb6ddcb5c..189a8289ea 100644
--- a/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
@@ -125,30 +125,30 @@ public class AMQQueue implements Managable
//private MBeanInfo _mbeanInfo;
// AMQ message attribute names exposed.
- private String[] _msgAttributeNames = { "MessageId",
- "Redelivered",
- "Content's size",
- "Contents" };
+ private String[] _msgAttributeNames = {"MessageId",
+ "Redelivered",
+ "Content's size",
+ "Contents"};
// AMQ Message attribute descriptions.
- private String[] _msgAttributeDescriptions = { "Message Id",
- "Redelivered",
- "Message content's size in bytes",
- "Message content bodies" };
+ private String[] _msgAttributeDescriptions = {"Message Id",
+ "Redelivered",
+ "Message content's size in bytes",
+ "Message content bodies"};
// AMQ message attribute types.
- private OpenType[] _msgAttributeTypes = new OpenType[4];
+ private OpenType[] _msgAttributeTypes = new OpenType[4];
// Messages will be indexed according to the messageId.
- private String[] _msgAttributeIndex = { "MessageId"};
+ private String[] _msgAttributeIndex = {"MessageId"};
// Composite type for representing AMQ Message data.
private CompositeType _messageDataType = null;
// Datatype for representing AMQ messages list.
- private TabularType _messagelistDataType = null;
+ private TabularType _messagelistDataType = null;
- private String[] _contentNames = {"SerialNumber", "ContentBody"};
- private String[] _contentDesc = {"SerialNumber", "Message Content"};
- private String[] _contentIndex = {"SerialNumber"};
- private OpenType[] _contentType = new OpenType[2];
+ private String[] _contentNames = {"SerialNumber", "ContentBody"};
+ private String[] _contentDesc = {"SerialNumber", "Message Content"};
+ private String[] _contentIndex = {"SerialNumber"};
+ private OpenType[] _contentType = new OpenType[2];
private CompositeType _contentBodyType = null;
- private TabularType _contentBodyListType = null;
+ private TabularType _contentBodyListType = null;
@MBeanConstructor("Creates an MBean exposing an AMQQueue.")
public AMQQueueMBean() throws NotCompliantMBeanException
@@ -162,17 +162,17 @@ public class AMQQueue implements Managable
_queueName = jmxEncode(new StringBuffer(_name), 0).toString();
try
{
- _contentType[0] = SimpleType.INTEGER;
- _contentType[1] = new ArrayType(1, SimpleType.BYTE);
+ _contentType[0] = SimpleType.INTEGER;
+ _contentType[1] = new ArrayType(1, SimpleType.BYTE);
_contentBodyType = new CompositeType("Content",
- "Message body content",
- _contentNames,
- _contentDesc,
- _contentType);
+ "Message body content",
+ _contentNames,
+ _contentDesc,
+ _contentType);
_contentBodyListType = new TabularType("MessageContents",
- "MessageContent",
- _contentBodyType,
- _contentIndex);
+ "MessageContent",
+ _contentBodyType,
+ _contentIndex);
_msgAttributeTypes[0] = SimpleType.LONG;
_msgAttributeTypes[1] = SimpleType.BOOLEAN;
@@ -180,14 +180,14 @@ public class AMQQueue implements Managable
_msgAttributeTypes[3] = _contentBodyListType;
_messageDataType = new CompositeType("Message",
- "AMQ Message",
- _msgAttributeNames,
- _msgAttributeDescriptions,
- _msgAttributeTypes);
+ "AMQ Message",
+ _msgAttributeNames,
+ _msgAttributeDescriptions,
+ _msgAttributeTypes);
_messagelistDataType = new TabularType("Messages",
- "List of messages",
- _messageDataType,
- _msgAttributeIndex);
+ "List of messages",
+ _messageDataType,
+ _msgAttributeIndex);
}
catch (OpenDataException ex)
{
@@ -243,7 +243,7 @@ public class AMQQueue implements Managable
public Integer getActiveConsumerCount()
{
- return _subscribers.getWeight();
+ return _subscribers.getWeight();
}
public Long getReceivedMessageCount()
@@ -269,7 +269,7 @@ public class AMQQueue implements Managable
// Sets the queue depth, the max queue size
public void setQueueDepth(Long value)
{
- _queueDepth = value;
+ _queueDepth = value;
}
// Returns the size of messages in the queue
@@ -277,29 +277,36 @@ public class AMQQueue implements Managable
{
List<AMQMessage> list = _deliveryMgr.getMessages();
if (list.size() == 0)
+ {
return 0l;
+ }
long queueSize = 0;
for (AMQMessage message : list)
{
queueSize = queueSize + getMessageSize(message);
}
- return new Long(Math.round(queueSize/100));
+ return new Long(Math.round(queueSize / 100));
}
// Operations
// calculates the size of an AMQMessage
+
private long getMessageSize(AMQMessage msg)
{
if (msg == null)
+ {
return 0l;
+ }
List<ContentBody> cBodies = msg.getContentBodies();
long messageSize = 0;
for (ContentBody body : cBodies)
{
if (body != null)
+ {
messageSize = messageSize + body.getSize();
+ }
}
return messageSize;
}
@@ -318,8 +325,8 @@ public class AMQQueue implements Managable
long messageSize = getMessageSize(msg);
if (messageSize >= getMaximumMessageSize())
{
- notifyClients("MessageSize = " + messageSize + ", Message size (MessageID="+ msg.getMessageId() +
- ")is higher than the threshold value");
+ notifyClients("MessageSize = " + messageSize + ", Message size (MessageID=" + msg.getMessageId() +
+ ")is higher than the threshold value");
}
// Check for queue size in bytes
@@ -334,13 +341,13 @@ public class AMQQueue implements Managable
private void notifyClients(String notificationMsg)
{
Notification n = new Notification(
- MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
- this,
- ++_notificationSequenceNumber,
- System.currentTimeMillis(),
- notificationMsg);
+ MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
+ this,
+ ++_notificationSequenceNumber,
+ System.currentTimeMillis(),
+ notificationMsg);
- _broadcaster.sendNotification(n);
+ _broadcaster.sendNotification(n);
}
public void deleteMessageFromTop() throws JMException
@@ -349,7 +356,7 @@ public class AMQQueue implements Managable
{
_deliveryMgr.removeAMessageFromTop();
}
- catch(AMQException ex)
+ catch (AMQException ex)
{
throw new MBeanException(ex, ex.toString());
}
@@ -369,6 +376,7 @@ public class AMQQueue implements Managable
/**
* Returns the messages stored in this queue in tabular form.
+ *
* @param beginIndex
* @param endIndex
* @return AMQ messages in tabular form.
@@ -379,7 +387,7 @@ public class AMQQueue implements Managable
if ((beginIndex > endIndex) || (beginIndex < 1))
{
throw new JMException("FromIndex = " + beginIndex + ", ToIndex = " + endIndex +
- "\nFromIndex should be greater than 0 and less than ToIndex");
+ "\nFromIndex should be greater than 0 and less than ToIndex");
}
List<AMQMessage> list = _deliveryMgr.getMessages();
@@ -400,7 +408,7 @@ public class AMQQueue implements Managable
List<ContentBody> cBodies = msg.getContentBodies();
TabularDataSupport _contentList = new TabularDataSupport(_contentBodyListType);
- int contentSerialNo = 1;
+ int contentSerialNo = 1;
long size = 0;
for (ContentBody body : cBodies)
@@ -432,8 +440,9 @@ public class AMQQueue implements Managable
/**
* A utility to convert byte[] to Byte[]. Required to create composite
* type for message contents.
- * @param byteArray message content as byte[]
- * @return Byte[]
+ *
+ * @param byteArray message content as byte[]
+ * @return Byte[]
*/
private Byte[] getByteArray(byte[] byteArray)
{
@@ -450,20 +459,21 @@ public class AMQQueue implements Managable
/**
* Creates all the notifications this MBean can send.
+ *
* @return Notifications broadcasted by this MBean.
*/
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
String[] notificationTypes = new String[]
- {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
String name = MonitorNotification.class.getName();
String description = "An attribute of this MBean has reached threshold value";
MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes,
name,
description);
- return new MBeanNotificationInfo[] {info1};
+ return new MBeanNotificationInfo[]{info1};
}
} // End of AMQMBean class
@@ -552,7 +562,7 @@ public class AMQQueue implements Managable
{
return new AMQQueueMBean();
}
- catch(NotCompliantMBeanException ex)
+ catch (NotCompliantMBeanException ex)
{
throw new AMQException("AMQQueue MBean creation has failed.", ex);
}
@@ -616,10 +626,10 @@ public class AMQQueue implements Managable
if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
ps,
consumerTag)))
- == null)
+ == null)
{
throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag +
- " and protocol session key " + ps.getKey() + " not registered with queue " + this);
+ " and protocol session key " + ps.getKey() + " not registered with queue " + this);
}
// if we are eligible for auto deletion, unregister from the queue registry
@@ -634,12 +644,12 @@ public class AMQQueue implements Managable
public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException
{
- if(checkUnused && !_subscribers.isEmpty())
+ if (checkUnused && !_subscribers.isEmpty())
{
_logger.info("Will not delete " + this + " as it is in use.");
return 0;
}
- else if(checkEmpty && _deliveryMgr.getQueueMessageCount() > 0)
+ else if (checkEmpty && _deliveryMgr.getQueueMessageCount() > 0)
{
_logger.info("Will not delete " + this + " as it is not empty.");
return 0;
@@ -668,7 +678,7 @@ public class AMQQueue implements Managable
public void deliver(AMQMessage msg) throws AMQException
{
TxnBuffer buffer = msg.getTxnBuffer();
- if(buffer == null)
+ if (buffer == null)
{
//non-transactional
record(msg);
@@ -695,7 +705,7 @@ public class AMQQueue implements Managable
msg.checkDeliveredToConsumer();
updateReceivedMessageCount(msg);
}
- catch(NoConsumersException e)
+ catch (NoConsumersException e)
{
// as this message will be returned, it should be removed
// from the queue:
@@ -710,7 +720,7 @@ public class AMQQueue implements Managable
msg.dequeue(this);
msg.decrementReference();
}
- catch(MessageCleanupException e)
+ catch (MessageCleanupException e)
{
//Message was dequeued, but could notthen be deleted
//though it is no longer referenced. This should be very
@@ -718,7 +728,7 @@ public class AMQQueue implements Managable
//done through some form of manual intervention.
_logger.error(e, e);
}
- catch(AMQException e)
+ catch (AMQException e)
{
throw new FailedDequeueException(_name, e);
}
@@ -768,7 +778,7 @@ public class AMQQueue implements Managable
private void debug(String msg, Object... args)
{
- if(_logger.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
_logger.debug(MessageFormat.format(msg, args));
}
@@ -802,7 +812,7 @@ public class AMQQueue implements Managable
{
process(_msg);
}
- catch(FailedDequeueException e)
+ catch (FailedDequeueException e)
{
//TODO: is there anything else we can do here? I think not...
_logger.error("Error during commit of a queue delivery: " + e, e);