diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java | 417 |
1 files changed, 417 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java new file mode 100644 index 0000000000..fcac78fafa --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -0,0 +1,417 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.protocol; + +import java.util.Date; +import java.util.List; + +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.MBeanNotificationInfo; +import javax.management.NotCompliantMBeanException; +import javax.management.Notification; +import javax.management.ObjectName; +import javax.management.monitor.MonitorNotification; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.management.common.mbeans.ManagedConnection; +import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.ManagementActor; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedObject; + +/** + * This MBean class implements the management interface. In order to make more attributes, operations and notifications + * available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here. + */ +@MBeanDescription("Management Bean for an AMQ Broker Connection") +public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection +{ + private AMQProtocolSession _protocolSession = null; + private String _name = null; + + // openmbean data types for representing the channel attributes + + private static final OpenType[] _channelAttributeTypes = + { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER, SimpleType.BOOLEAN }; + private static CompositeType _channelType = null; // represents the data type for channel data + private static TabularType _channelsType = null; // Data type for list of channels type + private static final AMQShortString BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION = + new AMQShortString("Broker Management Console has closed the connection."); + + @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") + public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException + { + super(ManagedConnection.class, ManagedConnection.TYPE); + _protocolSession = amqProtocolSession; + String remote = getRemoteAddress(); + _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote; + init(); + } + + static + { + try + { + init(); + } + catch (JMException ex) + { + // This is not expected to ever occur. + throw new RuntimeException("Got JMException in static initializer.", ex); + } + } + + /** + * initialises the openmbean data types + */ + private static void init() throws OpenDataException + { + _channelType = + new CompositeType("Channel", "Channel Details", COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), + COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), _channelAttributeTypes); + _channelsType = new TabularType("Channels", "Channels", _channelType, TABULAR_UNIQUE_INDEX.toArray(new String[TABULAR_UNIQUE_INDEX.size()])); + } + + public String getClientId() + { + return String.valueOf(_protocolSession.getContextKey()); + } + + public String getAuthorizedId() + { + return (_protocolSession.getPrincipal() != null ) ? _protocolSession.getPrincipal().getName() : null; + } + + public String getVersion() + { + return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString(); + } + + public Date getLastIoTime() + { + return new Date(_protocolSession.getLastIoTime()); + } + + public String getRemoteAddress() + { + return _protocolSession.getRemoteAddress().toString(); + } + + public ManagedObject getParentObject() + { + return _protocolSession.getVirtualHost().getManagedObject(); + } + + public Long getWrittenBytes() + { + return _protocolSession.getWrittenBytes(); + } + + public Long getReadBytes() + { + return _protocolSession.getWrittenBytes(); + } + + public Long getMaximumNumberOfChannels() + { + return _protocolSession.getMaximumNumberOfChannels(); + } + + public void setMaximumNumberOfChannels(Long value) + { + _protocolSession.setMaximumNumberOfChannels(value); + } + + public String getObjectInstanceName() + { + return ObjectName.quote(_name); + } + + /** + * commits transactions for a transactional channel + * + * @param channelId + * @throws JMException if channel with given id doesn't exist or if commit fails + */ + public void commitTransactions(int channelId) throws JMException + { + CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); + try + { + AMQChannel channel = _protocolSession.getChannel(channelId); + if (channel == null) + { + throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); + } + + _protocolSession.commitTransactions(channel); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + finally + { + CurrentActor.remove(); + } + } + + /** + * rollsback the transactions for a transactional channel + * + * @param channelId + * @throws JMException if channel with given id doesn't exist or if rollback fails + */ + public void rollbackTransactions(int channelId) throws JMException + { + CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); + try + { + AMQChannel channel = _protocolSession.getChannel(channelId); + if (channel == null) + { + throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); + } + + _protocolSession.rollbackTransactions(channel); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + finally + { + CurrentActor.remove(); + } + } + + /** + * Creates the list of channels in tabular form from the _channelMap. + * + * @return list of channels in tabular form. + * @throws OpenDataException + */ + public TabularData channels() throws OpenDataException + { + TabularDataSupport channelsList = new TabularDataSupport(_channelsType); + List<AMQChannel> list = _protocolSession.getChannels(); + + for (AMQChannel channel : list) + { + Object[] itemValues = + { + channel.getChannelId(), channel.isTransactional(), + (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getNameShortString().asString() : null, + channel.getUnacknowledgedMessageMap().size(), channel.getBlocking() + }; + + CompositeData channelData = new CompositeDataSupport(_channelType, + COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), itemValues); + channelsList.put(channelData); + } + + return channelsList; + } + + /** + * closes the connection. The administrator can use this management operation to close connection to free up + * resources. + * @throws JMException + */ + public void closeConnection() throws JMException + { + + MethodRegistry methodRegistry = _protocolSession.getMethodRegistry(); + ConnectionCloseBody responseBody = + methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), + // replyCode + BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION, + // replyText, + 0, + 0); + + // This seems ugly but because we use closeConnection in both normal + // broker operation and as part of the management interface it cannot + // be avoided. The Current Actor will be null when this method is + // called via the Management interface. This is because we allow the + // Local API connection with JConsole. If we did not allow that option + // then the CurrentActor could be set in our JMX Proxy object. + // As it is we need to set the CurrentActor on all MBean methods + // Ideally we would not have a single method that can be called from + // two contexts. + boolean removeActor = false; + if (CurrentActor.get() == null) + { + removeActor = true; + CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); + } + + try + { + _protocolSession.writeFrame(responseBody.generateFrame(0)); + + try + { + + _protocolSession.closeSession(); + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + finally + { + if (removeActor) + { + CurrentActor.remove(); + } + } + } + + @Override + public MBeanNotificationInfo[] getNotificationInfo() + { + String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED }; + String name = MonitorNotification.class.getName(); + String description = "Channel count has reached threshold value"; + MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); + + return new MBeanNotificationInfo[] { info1 }; + } + + public void notifyClients(String notificationMsg) + { + Notification n = + new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber, + System.currentTimeMillis(), notificationMsg); + _broadcaster.sendNotification(n); + } + + public void resetStatistics() throws Exception + { + _protocolSession.resetStatistics(); + } + + public double getPeakMessageDeliveryRate() + { + return _protocolSession.getMessageDeliveryStatistics().getPeak(); + } + + public double getPeakDataDeliveryRate() + { + return _protocolSession.getDataDeliveryStatistics().getPeak(); + } + + public double getMessageDeliveryRate() + { + return _protocolSession.getMessageDeliveryStatistics().getRate(); + } + + public double getDataDeliveryRate() + { + return _protocolSession.getDataDeliveryStatistics().getRate(); + } + + public long getTotalMessagesDelivered() + { + return _protocolSession.getMessageDeliveryStatistics().getTotal(); + } + + public long getTotalDataDelivered() + { + return _protocolSession.getDataDeliveryStatistics().getTotal(); + } + + public double getPeakMessageReceiptRate() + { + return _protocolSession.getMessageReceiptStatistics().getPeak(); + } + + public double getPeakDataReceiptRate() + { + return _protocolSession.getDataReceiptStatistics().getPeak(); + } + + public double getMessageReceiptRate() + { + return _protocolSession.getMessageReceiptStatistics().getRate(); + } + + public double getDataReceiptRate() + { + return _protocolSession.getDataReceiptStatistics().getRate(); + } + + public long getTotalMessagesReceived() + { + return _protocolSession.getMessageReceiptStatistics().getTotal(); + } + + public long getTotalDataReceived() + { + return _protocolSession.getDataReceiptStatistics().getTotal(); + } + + public boolean isStatisticsEnabled() + { + return _protocolSession.isStatisticsEnabled(); + } + + public void setStatisticsEnabled(boolean enabled) + { + _protocolSession.setStatisticsEnabled(enabled); + } +} |