diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java | 115 |
1 files changed, 40 insertions, 75 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 46289ecab8..fd97bf0d96 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -36,7 +36,10 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.state.AMQStateManager; -import javax.management.*; +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.MBeanNotificationInfo; +import javax.management.Notification; import javax.management.monitor.MonitorNotification; import javax.management.openmbean.*; import javax.security.sasl.SaslServer; @@ -97,66 +100,35 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, * augment the ManagedConnection interface and add the appropriate implementation here. */ @MBeanDescription("Management Bean for an AMQ Broker Connection") - private final class ManagedAMQProtocolSession extends AMQManagedObject - implements ManagedConnection + private final class ManagedAMQProtocolSession extends AMQManagedObject implements ManagedConnection { private String _name = null; - /** - * Represents the channel attributes sent with channel data. - */ - private String[] _channelAtttibuteNames = { "ChannelId", - "Transactional", - "DefaultQueue", - "UnacknowledgedMessageCount"}; - private String[] _channelAttributeDescriptions = { "Channel Identifier", - "is Channel Transactional?", - "Default Queue Name", - "Unacknowledged Message Count"}; - private OpenType[] _channelAttributeTypes = { SimpleType.INTEGER, - SimpleType.BOOLEAN, - SimpleType.STRING, - SimpleType.INTEGER}; - - private String[] _indexNames = { "ChannelId" }; //Channels in the list will be indexed according to channelId. - private CompositeType _channelType = null; // represents the data type for channel data - private TabularType _channelsType = null; // Datatype for list of channelsType + //openmbean data types for representing the channel attributes + private String[] _channelAtttibuteNames = { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"}; + private String[] _indexNames = {_channelAtttibuteNames[0]}; + private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER}; + private CompositeType _channelType = null; // represents the data type for channel data + private TabularType _channelsType = null; // Data type for list of channels type private TabularDataSupport _channelsList = null; @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") - public ManagedAMQProtocolSession() throws NotCompliantMBeanException + public ManagedAMQProtocolSession() throws JMException { super(ManagedConnection.class, ManagedConnection.TYPE); init(); } /** - * initialises the CompositeTypes and TabularType attributes. + * initialises the openmbean data types */ - private void init() + private void init() throws OpenDataException { String remote = getRemoteAddress(); remote = "anonymous".equals(remote) ? remote + hashCode() : remote; _name = jmxEncode(new StringBuffer(remote), 0).toString(); - - try - { - _channelType = new CompositeType("channel", - "Channel Details", - _channelAtttibuteNames, - _channelAttributeDescriptions, - _channelAttributeTypes); - - _channelsType = new TabularType("channelsType", - "List of available channels", - _channelType, - _indexNames); - } - catch(OpenDataException ex) - { - // It should never occur. - _logger.error("OpenDataTypes could not be created.", ex); - throw new RuntimeException(ex); - } + _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, + _channelAtttibuteNames, _channelAttributeTypes); + _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames); } public Date getLastIoTime() @@ -179,12 +151,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return _minaProtocolSession.getReadBytes(); } - public Long getMaximumNumberOfAllowedChannels() + public Long getMaximumNumberOfChannels() { return _maxNoOfChannels; } - public void setMaximumNumberOfAllowedChannels(Long value) + public void setMaximumNumberOfChannels(Long value) { _maxNoOfChannels = value; } @@ -239,30 +211,25 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, * @return list of channels in tabular form. * @throws OpenDataException */ - public TabularData getChannels() throws OpenDataException + public TabularData channels() throws OpenDataException { _channelsList = new TabularDataSupport(_channelsType); for (Map.Entry<Integer, AMQChannel> entry : _channelMap.entrySet()) { AMQChannel channel = entry.getValue(); - Object[] itemValues = {channel.getChannelId(), - channel.isTransactional(), + Object[] itemValues = {channel.getChannelId(), channel.isTransactional(), (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null, channel.getUnacknowledgedMessageMap().size()}; - CompositeData channelData = new CompositeDataSupport(_channelType, - _channelAtttibuteNames, - itemValues); - + CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues); _channelsList.put(channelData); } return _channelsList; } - - public void closeChannel(int id) - throws Exception + + public void closeChannel(int id) throws Exception { try { @@ -274,8 +241,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - public void closeConnection() - throws Exception + public void closeConnection() throws Exception { try { @@ -290,13 +256,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, @Override public MBeanNotificationInfo[] getNotificationInfo() { - String[] notificationTypes = new String[] - {MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; + String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; String name = MonitorNotification.class.getName(); - String description = "An attribute of this MBean has reached threshold value"; - MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, - name, - description); + String description = "Channel count has reached threshold value"; + MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); return new MBeanNotificationInfo[] {info1}; } @@ -304,14 +267,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private void checkForNotification() { int channelsCount = _channelMap.size(); - if (channelsCount >= getMaximumNumberOfAllowedChannels()) + if (channelsCount >= getMaximumNumberOfChannels()) { - Notification n = new Notification( - MonitorNotification.THRESHOLD_VALUE_EXCEEDED, - this, - ++_notificationSequenceNumber, - System.currentTimeMillis(), - "ChannelsCount = " + channelsCount + ", ChannelsCount has reached the threshold value"); + Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, + ++_notificationSequenceNumber, System.currentTimeMillis(), + "Channel count (" + channelsCount + ") has reached the threshold value"); _broadcaster.sendNotification(n); } @@ -347,10 +307,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { return new ManagedAMQProtocolSession(); } - catch(NotCompliantMBeanException ex) + catch(JMException ex) { - _logger.error("AMQProtocolSession MBean creation has failed.", ex); - throw new AMQException("AMQProtocolSession MBean creation has failed.", ex); + _logger.error("AMQProtocolSession MBean creation has failed ", ex); + throw new AMQException("AMQProtocolSession MBean creation has failed ", ex); } } @@ -389,6 +349,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, int i = pv.length - 1; _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); // TODO: Close connection (but how to wait until message is sent?) + // ritchiem 2006-12-04 will this not do? +// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); +// future.join(); +// close connection + } } else |