summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java115
1 files changed, 40 insertions, 75 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 46289ecab8..fd97bf0d96 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -36,7 +36,10 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
-import javax.management.*;
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.Notification;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.*;
import javax.security.sasl.SaslServer;
@@ -97,66 +100,35 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
* augment the ManagedConnection interface and add the appropriate implementation here.
*/
@MBeanDescription("Management Bean for an AMQ Broker Connection")
- private final class ManagedAMQProtocolSession extends AMQManagedObject
- implements ManagedConnection
+ private final class ManagedAMQProtocolSession extends AMQManagedObject implements ManagedConnection
{
private String _name = null;
- /**
- * Represents the channel attributes sent with channel data.
- */
- private String[] _channelAtttibuteNames = { "ChannelId",
- "Transactional",
- "DefaultQueue",
- "UnacknowledgedMessageCount"};
- private String[] _channelAttributeDescriptions = { "Channel Identifier",
- "is Channel Transactional?",
- "Default Queue Name",
- "Unacknowledged Message Count"};
- private OpenType[] _channelAttributeTypes = { SimpleType.INTEGER,
- SimpleType.BOOLEAN,
- SimpleType.STRING,
- SimpleType.INTEGER};
-
- private String[] _indexNames = { "ChannelId" }; //Channels in the list will be indexed according to channelId.
- private CompositeType _channelType = null; // represents the data type for channel data
- private TabularType _channelsType = null; // Datatype for list of channelsType
+ //openmbean data types for representing the channel attributes
+ private String[] _channelAtttibuteNames = { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
+ private String[] _indexNames = {_channelAtttibuteNames[0]};
+ private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
+ private CompositeType _channelType = null; // represents the data type for channel data
+ private TabularType _channelsType = null; // Data type for list of channels type
private TabularDataSupport _channelsList = null;
@MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
- public ManagedAMQProtocolSession() throws NotCompliantMBeanException
+ public ManagedAMQProtocolSession() throws JMException
{
super(ManagedConnection.class, ManagedConnection.TYPE);
init();
}
/**
- * initialises the CompositeTypes and TabularType attributes.
+ * initialises the openmbean data types
*/
- private void init()
+ private void init() throws OpenDataException
{
String remote = getRemoteAddress();
remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
_name = jmxEncode(new StringBuffer(remote), 0).toString();
-
- try
- {
- _channelType = new CompositeType("channel",
- "Channel Details",
- _channelAtttibuteNames,
- _channelAttributeDescriptions,
- _channelAttributeTypes);
-
- _channelsType = new TabularType("channelsType",
- "List of available channels",
- _channelType,
- _indexNames);
- }
- catch(OpenDataException ex)
- {
- // It should never occur.
- _logger.error("OpenDataTypes could not be created.", ex);
- throw new RuntimeException(ex);
- }
+ _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
+ _channelAtttibuteNames, _channelAttributeTypes);
+ _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
}
public Date getLastIoTime()
@@ -179,12 +151,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return _minaProtocolSession.getReadBytes();
}
- public Long getMaximumNumberOfAllowedChannels()
+ public Long getMaximumNumberOfChannels()
{
return _maxNoOfChannels;
}
- public void setMaximumNumberOfAllowedChannels(Long value)
+ public void setMaximumNumberOfChannels(Long value)
{
_maxNoOfChannels = value;
}
@@ -239,30 +211,25 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
* @return list of channels in tabular form.
* @throws OpenDataException
*/
- public TabularData getChannels() throws OpenDataException
+ public TabularData channels() throws OpenDataException
{
_channelsList = new TabularDataSupport(_channelsType);
for (Map.Entry<Integer, AMQChannel> entry : _channelMap.entrySet())
{
AMQChannel channel = entry.getValue();
- Object[] itemValues = {channel.getChannelId(),
- channel.isTransactional(),
+ Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
(channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null,
channel.getUnacknowledgedMessageMap().size()};
- CompositeData channelData = new CompositeDataSupport(_channelType,
- _channelAtttibuteNames,
- itemValues);
-
+ CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
_channelsList.put(channelData);
}
return _channelsList;
}
-
- public void closeChannel(int id)
- throws Exception
+
+ public void closeChannel(int id) throws Exception
{
try
{
@@ -274,8 +241,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
- public void closeConnection()
- throws Exception
+ public void closeConnection() throws Exception
{
try
{
@@ -290,13 +256,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
- String[] notificationTypes = new String[]
- {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
String name = MonitorNotification.class.getName();
- String description = "An attribute of this MBean has reached threshold value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes,
- name,
- description);
+ String description = "Channel count has reached threshold value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
return new MBeanNotificationInfo[] {info1};
}
@@ -304,14 +267,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private void checkForNotification()
{
int channelsCount = _channelMap.size();
- if (channelsCount >= getMaximumNumberOfAllowedChannels())
+ if (channelsCount >= getMaximumNumberOfChannels())
{
- Notification n = new Notification(
- MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
- this,
- ++_notificationSequenceNumber,
- System.currentTimeMillis(),
- "ChannelsCount = " + channelsCount + ", ChannelsCount has reached the threshold value");
+ Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ ++_notificationSequenceNumber, System.currentTimeMillis(),
+ "Channel count (" + channelsCount + ") has reached the threshold value");
_broadcaster.sendNotification(n);
}
@@ -347,10 +307,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
return new ManagedAMQProtocolSession();
}
- catch(NotCompliantMBeanException ex)
+ catch(JMException ex)
{
- _logger.error("AMQProtocolSession MBean creation has failed.", ex);
- throw new AMQException("AMQProtocolSession MBean creation has failed.", ex);
+ _logger.error("AMQProtocolSession MBean creation has failed ", ex);
+ throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
}
}
@@ -389,6 +349,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
int i = pv.length - 1;
_minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
// TODO: Close connection (but how to wait until message is sent?)
+ // ritchiem 2006-12-04 will this not do?
+// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+// future.join();
+// close connection
+
}
}
else