summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-12-01 12:39:14 +0000
committerRobert Gemmell <robbie@apache.org>2011-12-01 12:39:14 +0000
commite9a79b81de730c2346079b2ba8501f58222ee389 (patch)
tree96a9da042f225e39cb05cd1b57fdc136f3446808
parenta3f0153ea94ad6a9dddef87947c384f0798f7ec8 (diff)
downloadqpid-python-e9a79b81de730c2346079b2ba8501f58222ee389.tar.gz
QPID-2243: 0-10 protocol connections do not have a matching JMX MBean to allow management.
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1209052 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java71
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java86
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java66
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionMBean.java264
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java12
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java151
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java14
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/transport/ServerConnectionMBeanTest.java229
-rw-r--r--java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java23
-rw-r--r--java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java229
-rwxr-xr-xjava/test-profiles/Java010Excludes4
12 files changed, 1052 insertions, 125 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java b/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java
new file mode 100644
index 0000000000..68350a1632
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java
@@ -0,0 +1,71 @@
+package org.apache.qpid.server.management;
+
+import javax.management.Notification;
+
+import javax.management.JMException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.monitor.MonitorNotification;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularType;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+
+public abstract class AbstractAMQManagedConnectionObject extends AMQManagedObject implements ManagedConnection
+{
+ protected final String _name;
+
+ protected static final OpenType[] _channelAttributeTypes = { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER, SimpleType.BOOLEAN };
+ protected static final CompositeType _channelType;
+ protected static final TabularType _channelsType;
+
+ protected static final String BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION_STR =
+ "Broker Management Console has closed the connection.";
+
+ static
+ {
+ try
+ {
+ _channelType = new CompositeType("Channel", "Channel Details", COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]),
+ COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), _channelAttributeTypes);
+ _channelsType = new TabularType("Channels", "Channels", _channelType, (String[]) TABULAR_UNIQUE_INDEX.toArray(new String[TABULAR_UNIQUE_INDEX.size()]));
+ }
+ catch (JMException ex)
+ {
+ // This is not expected to ever occur.
+ throw new RuntimeException("Got JMException in static initializer.", ex);
+ }
+ }
+
+ protected AbstractAMQManagedConnectionObject(final String remoteAddress) throws NotCompliantMBeanException
+ {
+ super(ManagedConnection.class, ManagedConnection.TYPE);
+ _name = "anonymous".equals(remoteAddress) ? (remoteAddress + hashCode()) : remoteAddress;
+ }
+
+ @Override
+ public String getObjectInstanceName()
+ {
+ return ObjectName.quote(_name);
+ }
+
+ public void notifyClients(String notificationMsg)
+ {
+ final Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+ System.currentTimeMillis(), notificationMsg);
+ _broadcaster.sendNotification(n);
+ }
+
+ @Override
+ public MBeanNotificationInfo[] getNotificationInfo()
+ {
+ String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
+ String name = MonitorNotification.class.getName();
+ String description = "Channel count has reached threshold value";
+ MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
+
+ return new MBeanNotificationInfo[] { info1 };
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index 16d99de492..8d39420631 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -39,89 +39,44 @@ package org.apache.qpid.server.protocol;
import java.util.Date;
import java.util.List;
-
import javax.management.JMException;
import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
import javax.management.NotCompliantMBeanException;
-import javax.management.Notification;
-import javax.management.ObjectName;
-import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.ManagementActor;
-import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.AbstractAMQManagedConnectionObject;
import org.apache.qpid.server.management.ManagedObject;
/**
* This MBean class implements the management interface. In order to make more attributes, operations and notifications
* available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here.
*/
-@MBeanDescription("Management Bean for an AMQ Broker Connection")
-public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
+@MBeanDescription("Management Bean for an AMQ Broker 0-9-1/0-9/0-8 Connections")
+public class AMQProtocolSessionMBean extends AbstractAMQManagedConnectionObject
{
private AMQProtocolSession _protocolSession = null;
- private String _name = null;
- // openmbean data types for representing the channel attributes
-
- private static final OpenType[] _channelAttributeTypes =
- { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER, SimpleType.BOOLEAN };
- private static CompositeType _channelType = null; // represents the data type for channel data
- private static TabularType _channelsType = null; // Data type for list of channels type
private static final AMQShortString BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION =
- new AMQShortString("Broker Management Console has closed the connection.");
+ new AMQShortString(BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION_STR);
- @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
+ @MBeanConstructor("Creates an MBean exposing an AMQ Broker 0-9-1/0-9/0-8 Connection")
public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException
{
- super(ManagedConnection.class, ManagedConnection.TYPE);
+ super(amqProtocolSession.getRemoteAddress().toString());
_protocolSession = amqProtocolSession;
- String remote = getRemoteAddress();
- _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
- init();
- }
-
- static
- {
- try
- {
- init();
- }
- catch (JMException ex)
- {
- // This is not expected to ever occur.
- throw new RuntimeException("Got JMException in static initializer.", ex);
- }
- }
-
- /**
- * initialises the openmbean data types
- */
- private static void init() throws OpenDataException
- {
- _channelType =
- new CompositeType("Channel", "Channel Details", COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]),
- COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), _channelAttributeTypes);
- _channelsType = new TabularType("Channels", "Channels", _channelType, TABULAR_UNIQUE_INDEX.toArray(new String[TABULAR_UNIQUE_INDEX.size()]));
}
public String getClientId()
@@ -169,16 +124,6 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
return _protocolSession.getMaximumNumberOfChannels();
}
- public void setMaximumNumberOfChannels(Long value)
- {
- _protocolSession.setMaximumNumberOfChannels(value);
- }
-
- public String getObjectInstanceName()
- {
- return ObjectName.quote(_name);
- }
-
/**
* commits transactions for a transactional channel
*
@@ -321,25 +266,6 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
}
}
- @Override
- public MBeanNotificationInfo[] getNotificationInfo()
- {
- String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
- String name = MonitorNotification.class.getName();
- String description = "Channel count has reached threshold value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
-
- return new MBeanNotificationInfo[] { info1 };
- }
-
- public void notifyClients(String notificationMsg)
- {
- Notification n =
- new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
- System.currentTimeMillis(), notificationMsg);
- _broadcaster.sendNotification(n);
- }
-
public void resetStatistics() throws Exception
{
_protocolSession.resetStatistics();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index e18b453db3..922531a271 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -24,6 +24,14 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTIO
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
+
+import org.apache.qpid.server.management.ManagedObject;
+
+import org.apache.qpid.server.management.Managable;
+
import java.security.Principal;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -55,7 +63,7 @@ import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.Session;
-public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
+public class ServerConnection extends Connection implements Managable, AMQConnectionModel, LogSubject, AuthorizationHolder
{
private ConnectionConfig _config;
private Runnable _onOpenTask;
@@ -67,6 +75,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final long _connectionId;
+
+ private ServerConnectionMBean _mBean;
+ private VirtualHost _virtualHost;
+ private AtomicLong _lastIoTime = new AtomicLong();
public ServerConnection(final long connectionId)
{
@@ -133,9 +145,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
super.setConnectionDelegate(delegate);
}
- private VirtualHost _virtualHost;
-
-
public VirtualHost getVirtualHost()
{
return _virtualHost;
@@ -144,8 +153,18 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
public void setVirtualHost(VirtualHost virtualHost)
{
_virtualHost = virtualHost;
-
+
initialiseStatistics();
+
+ try
+ {
+ _mBean = new ServerConnectionMBean(this);
+ _mBean.register();
+ }
+ catch (JMException jme)
+ {
+ log.error("Unable to create mBean for ServerConnection",jme);
+ }
}
public void setConnectionConfig(final ConnectionConfig config)
@@ -190,6 +209,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
@Override
public void received(ProtocolEvent event)
{
+ _lastIoTime.set(System.currentTimeMillis());
if (event.isConnectionControl())
{
CurrentActor.set(_actor);
@@ -260,6 +280,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
public void close(AMQConstant cause, String message) throws AMQException
{
closeSubscriptions();
+ if (_mBean != null)
+ {
+ _mBean.unregister();
+ _mBean = null;
+ }
ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
try
{
@@ -405,6 +430,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
public void closed()
{
closeSubscriptions();
+ if (_mBean != null)
+ {
+ _mBean.unregister();
+ _mBean = null;
+ }
super.closed();
}
@@ -416,4 +446,30 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
}
}
+ @Override
+ public ManagedObject getManagedObject()
+ {
+ return _mBean;
+ }
+
+ @Override
+ public void send(ProtocolEvent event)
+ {
+ _lastIoTime.set(System.currentTimeMillis());
+ super.send(event);
+ }
+
+ public AtomicLong getLastIoTime()
+ {
+ return _lastIoTime;
+ }
+
+ void checkForNotification()
+ {
+ int channelsCount = getSessionModels().size();
+ if (_mBean != null && channelsCount >= getConnectionDelegate().getChannelMax())
+ {
+ _mBean.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index 8d6e0e0d80..66ed6f1e62 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -28,10 +28,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
-
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.configuration.BrokerConfig;
@@ -49,6 +47,7 @@ import org.apache.qpid.transport.ConnectionClose;
import org.apache.qpid.transport.ConnectionCloseCode;
import org.apache.qpid.transport.ConnectionOpen;
import org.apache.qpid.transport.ConnectionOpenOk;
+import org.apache.qpid.transport.ConnectionStartOk;
import org.apache.qpid.transport.ConnectionTuneOk;
import org.apache.qpid.transport.ServerDelegate;
import org.apache.qpid.transport.Session;
@@ -62,6 +61,8 @@ public class ServerConnectionDelegate extends ServerDelegate
{
private final String _localFQDN;
private final IApplicationRegistry _appRegistry;
+ private int _maxNoOfChannels;
+ private Map<String,Object> _clientProperties;
public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
{
@@ -77,6 +78,7 @@ public class ServerConnectionDelegate extends ServerDelegate
_appRegistry = appRegistry;
_localFQDN = localFQDN;
+ _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
}
private static Map<String, Object> createConnectionProperties(final BrokerConfig brokerConfig)
@@ -154,7 +156,7 @@ public class ServerConnectionDelegate extends ServerDelegate
public void connectionOpen(Connection conn, ConnectionOpen open)
{
final ServerConnection sconn = (ServerConnection) conn;
-
+
VirtualHost vhost;
String vhostName;
if(open.hasVirtualHost())
@@ -222,7 +224,12 @@ public class ServerConnectionDelegate extends ServerDelegate
@Override
protected int getChannelMax()
{
- return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
+ return _maxNoOfChannels;
+ }
+
+ protected void setChannelMax(int channelMax)
+ {
+ _maxNoOfChannels = channelMax;
}
@Override public void sessionDetach(Connection conn, SessionDetach dtc)
@@ -253,6 +260,7 @@ public class ServerConnectionDelegate extends ServerDelegate
{
ssn = sessionAttachImpl(conn, atc);
conn.registerSession(ssn);
+ ((ServerConnection)conn).checkForNotification();
}
else
{
@@ -279,4 +287,16 @@ public class ServerConnectionDelegate extends ServerDelegate
}
return true;
}
+
+ @Override
+ public void connectionStartOk(Connection conn, ConnectionStartOk ok)
+ {
+ _clientProperties = ok.getClientProperties();
+ super.connectionStartOk(conn, ok);
+ }
+
+ public Map<String,Object> getClientProperties()
+ {
+ return _clientProperties;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionMBean.java
new file mode 100644
index 0000000000..17c7bed601
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionMBean.java
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import javax.management.JMException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.management.AbstractAMQManagedConnectionObject;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+
+/**
+ * This MBean class implements the management interface. In order to make more attributes, operations and notifications
+ * available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here.
+ */
+@MBeanDescription("Management Bean for an AMQ Broker 0-10 Connection")
+public class ServerConnectionMBean extends AbstractAMQManagedConnectionObject
+{
+ private final ServerConnection _serverConnection;
+
+ @MBeanConstructor("Creates an MBean exposing an AMQ Broker 0-10 Connection")
+ protected ServerConnectionMBean(final ServerConnection serverConnection) throws NotCompliantMBeanException
+ {
+ super(serverConnection.getConfig().getAddress());
+ _serverConnection = serverConnection;
+ }
+
+ @Override
+ public ManagedObject getParentObject()
+ {
+ return _serverConnection.getVirtualHost().getManagedObject();
+ }
+
+ @Override
+ public String getClientId()
+ {
+ return _serverConnection.getClientId();
+ }
+
+ @Override
+ public String getAuthorizedId()
+ {
+ return _serverConnection.getAuthorizedPrincipal().getName();
+ }
+
+ @Override
+ public String getVersion()
+ {
+ return String.valueOf(_serverConnection.getConnectionDelegate().getClientProperties().get(ClientProperties.version.toString()));
+ }
+
+ @Override
+ public String getRemoteAddress()
+ {
+ return _serverConnection.getConfig().getAddress();
+ }
+
+ @Override
+ public Date getLastIoTime()
+ {
+ return new Date(_serverConnection.getLastIoTime().longValue());
+ }
+
+ @Override
+ public Long getMaximumNumberOfChannels()
+ {
+ return (long) _serverConnection.getConnectionDelegate().getChannelMax();
+ }
+
+ @Override
+ public TabularData channels() throws IOException, JMException
+ {
+ final TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
+ final List<AMQSessionModel> list = _serverConnection.getSessionModels();
+
+ for (final AMQSessionModel channel : list)
+ {
+ final ServerSession session = (ServerSession)channel;
+ Object[] itemValues =
+ {
+ session.getChannel(),
+ session.isTransactional(),
+ null,
+ session.getUnacknowledgedMessageCount(),
+ session.getBlocking()
+ };
+
+ final CompositeData channelData = new CompositeDataSupport(_channelType,
+ COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), itemValues);
+ channelsList.put(channelData);
+ }
+ return channelsList;
+ }
+
+ @Override
+ public void commitTransactions(int channelId) throws JMException
+ {
+ final ServerSession session = (ServerSession)_serverConnection.getSession(channelId);
+ if (session == null)
+ {
+ throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
+ }
+ else if (session.isTransactional())
+ {
+ CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
+ try
+ {
+ session.commit();
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ }
+ }
+
+ @Override
+ public void rollbackTransactions(int channelId) throws JMException
+ {
+ final ServerSession session = (ServerSession)_serverConnection.getSession(channelId);
+ if (session == null)
+ {
+ throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
+ }
+ else if (session.isTransactional())
+ {
+ CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
+ try
+ {
+ session.rollback();
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ }
+ }
+
+ @Override
+ public void closeConnection() throws Exception
+ {
+ _serverConnection.mgmtClose();
+ }
+
+ @Override
+ public void resetStatistics() throws Exception
+ {
+ _serverConnection.resetStatistics();
+ }
+
+ @Override
+ public double getPeakMessageDeliveryRate()
+ {
+ return _serverConnection.getMessageDeliveryStatistics().getPeak();
+ }
+
+ @Override
+ public double getPeakDataDeliveryRate()
+ {
+ return _serverConnection.getDataDeliveryStatistics().getPeak();
+ }
+
+ @Override
+ public double getMessageDeliveryRate()
+ {
+ return _serverConnection.getMessageDeliveryStatistics().getRate();
+ }
+
+ @Override
+ public double getDataDeliveryRate()
+ {
+ return _serverConnection.getDataDeliveryStatistics().getRate();
+ }
+
+ @Override
+ public long getTotalMessagesDelivered()
+ {
+ return _serverConnection.getMessageDeliveryStatistics().getTotal();
+ }
+
+ @Override
+ public long getTotalDataDelivered()
+ {
+ return _serverConnection.getDataDeliveryStatistics().getTotal();
+ }
+
+ @Override
+ public double getPeakMessageReceiptRate()
+ {
+ return _serverConnection.getMessageReceiptStatistics().getPeak();
+ }
+
+ @Override
+ public double getPeakDataReceiptRate()
+ {
+ return _serverConnection.getDataReceiptStatistics().getPeak();
+ }
+
+ @Override
+ public double getMessageReceiptRate()
+ {
+ return _serverConnection.getMessageReceiptStatistics().getRate();
+ }
+
+ @Override
+ public double getDataReceiptRate()
+ {
+ return _serverConnection.getDataReceiptStatistics().getRate();
+ }
+
+ @Override
+ public long getTotalMessagesReceived()
+ {
+ return _serverConnection.getMessageReceiptStatistics().getTotal();
+ }
+
+ @Override
+ public long getTotalDataReceived()
+ {
+ return _serverConnection.getDataReceiptStatistics().getTotal();
+ }
+
+ @Override
+ public boolean isStatisticsEnabled()
+ {
+ return _serverConnection.isStatisticsEnabled();
+ }
+
+ @Override
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _serverConnection.setStatisticsEnabled(enabled);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 337d1f02cc..dfad097dbc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -37,9 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.security.auth.Subject;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.ProtocolEngine;
@@ -698,4 +696,14 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
unregister(subscription_0_10);
}
}
+
+ public int getUnacknowledgedMessageCount()
+ {
+ return _messageDispositionListenerMap.size();
+ }
+
+ public boolean getBlocking()
+ {
+ return false; //TODO: Blocking not implemented on 0-10 yet.
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java
new file mode 100644
index 0000000000..73e0bc5d27
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java
@@ -0,0 +1,151 @@
+package org.apache.qpid.server.configuration;
+
+import java.util.UUID;
+
+public class MockConnectionConfig implements ConnectionConfig
+{
+
+ public MockConnectionConfig(UUID _id, ConnectionConfigType _configType,
+ ConfiguredObject<ConnectionConfigType, ConnectionConfig> _parent, boolean _durable,
+ long _createTime, VirtualHostConfig _virtualHost, String _address, Boolean _incoming,
+ Boolean _systemConnection, Boolean _federationLink, String _authId, String _remoteProcessName,
+ Integer _remotePID, Integer _remoteParentPID, ConfigStore _configStore, Boolean _shadow)
+ {
+ super();
+ this._id = _id;
+ this._configType = _configType;
+ this._parent = _parent;
+ this._durable = _durable;
+ this._createTime = _createTime;
+ this._virtualHost = _virtualHost;
+ this._address = _address;
+ this._incoming = _incoming;
+ this._systemConnection = _systemConnection;
+ this._federationLink = _federationLink;
+ this._authId = _authId;
+ this._remoteProcessName = _remoteProcessName;
+ this._remotePID = _remotePID;
+ this._remoteParentPID = _remoteParentPID;
+ this._configStore = _configStore;
+ this._shadow = _shadow;
+ }
+
+ private UUID _id;
+ private ConnectionConfigType _configType;
+ private ConfiguredObject<ConnectionConfigType, ConnectionConfig> _parent;
+ private boolean _durable;
+ private long _createTime;
+ private VirtualHostConfig _virtualHost;
+ private String _address;
+ private Boolean _incoming;
+ private Boolean _systemConnection;
+ private Boolean _federationLink;
+ private String _authId;
+ private String _remoteProcessName;
+ private Integer _remotePID;
+ private Integer _remoteParentPID;
+ private ConfigStore _configStore;
+ private Boolean _shadow;
+
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public ConnectionConfigType getConfigType()
+ {
+ return _configType;
+ }
+
+ @Override
+ public ConfiguredObject<ConnectionConfigType, ConnectionConfig> getParent()
+ {
+ return _parent;
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return _durable;
+ }
+
+ @Override
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+ @Override
+ public VirtualHostConfig getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ @Override
+ public String getAddress()
+ {
+ return _address;
+ }
+
+ @Override
+ public Boolean isIncoming()
+ {
+ return _incoming;
+ }
+
+ @Override
+ public Boolean isSystemConnection()
+ {
+ return _systemConnection;
+ }
+
+ @Override
+ public Boolean isFederationLink()
+ {
+ return _federationLink;
+ }
+
+ @Override
+ public String getAuthId()
+ {
+ return _authId;
+ }
+
+ @Override
+ public String getRemoteProcessName()
+ {
+ return _remoteProcessName;
+ }
+
+ @Override
+ public Integer getRemotePID()
+ {
+ return _remotePID;
+ }
+
+ @Override
+ public Integer getRemoteParentPID()
+ {
+ return _remoteParentPID;
+ }
+
+ @Override
+ public ConfigStore getConfigStore()
+ {
+ return _configStore;
+ }
+
+ @Override
+ public Boolean isShadow()
+ {
+ return _shadow;
+ }
+
+ @Override
+ public void mgmtClose()
+ {
+ }
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index 4df051edb5..e1dae5fcc1 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -20,23 +20,21 @@
*/
package org.apache.qpid.server.protocol;
-import junit.framework.TestCase;
+import javax.management.JMException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedConnection;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
-
-import javax.management.JMException;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.virtualhost.VirtualHost;
/** Test class to test MBean operations for AMQMinaProtocolSession. */
@@ -67,7 +65,7 @@ public class AMQProtocolSessionMBeanTest extends InternalBrokerBaseCase
assertTrue(channelCount == 2);
// general properties test
- _mbean.setMaximumNumberOfChannels(1000L);
+ _protocolSession.setMaximumNumberOfChannels(1000L);
assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
// check APIs
diff --git a/java/broker/src/test/java/org/apache/qpid/server/transport/ServerConnectionMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/transport/ServerConnectionMBeanTest.java
new file mode 100644
index 0000000000..78ba8c1645
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/transport/ServerConnectionMBeanTest.java
@@ -0,0 +1,229 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.JMException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.server.configuration.MockConnectionConfig;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.Session;
+
+public class ServerConnectionMBeanTest extends InternalBrokerBaseCase
+{
+ private ServerConnection _serverConnection;
+ private ServerSessionMock _serverSession;
+ private ServerConnectionMBean _mbean;
+ private List<Session> _sessions = new ArrayList<Session>();
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ final VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+ _serverConnection = new ServerConnection(1)
+ {
+ protected Collection<Session> getChannels()
+ {
+ return _sessions;
+ }
+ public Session getSession(int channelId)
+ {
+ for(Session session : _sessions)
+ {
+ if (session.getChannel() == channelId)
+ {
+ return session;
+ }
+ }
+ return null;
+ }
+ @Override
+ public AtomicLong getLastIoTime()
+ {
+ return new AtomicLong(1);
+ }
+ };
+ final MockConnectionConfig config = new MockConnectionConfig(UUID.randomUUID(), null, null,
+ false, 1, vhost, "address", Boolean.TRUE, Boolean.TRUE, Boolean.TRUE,
+ "authid", "remoteProcessName", new Integer(1967), new Integer(1970), vhost.getConfigStore(), Boolean.FALSE);
+ _serverConnection.setConnectionConfig(config);
+ _serverConnection.setVirtualHost(vhost);
+ _serverConnection.setConnectionDelegate(new ServerConnectionDelegate(getRegistry(), ""));
+ _serverSession = new ServerSessionMock(_serverConnection, 1);
+ _mbean = (ServerConnectionMBean) _serverConnection.getManagedObject();
+ }
+
+ public void testChannels() throws Exception
+ {
+ // check the channel count is correct
+ TabularData tabularData = _mbean.channels();
+
+ int channelCount = tabularData.size();
+ assertEquals("Unexpected number of channels",1,channelCount);
+ _sessions.add(new ServerSession(_serverConnection, new ServerSessionDelegate(),
+ new Binary(getName().getBytes()), 2 , _serverConnection.getConfig()));
+
+ channelCount = _mbean.channels().size();
+ assertEquals("Unexpected number of channels",2,channelCount);
+
+ final CompositeData chanresult = tabularData.get(new Integer[]{1});
+ assertNotNull(chanresult);
+ assertEquals("Unexpected channel id", new Integer(1),(Integer)chanresult.get(ManagedConnection.CHAN_ID));
+ assertNull("Unexpected default queue", chanresult.get(ManagedConnection.DEFAULT_QUEUE));
+ assertFalse("Unexpected transactional flag", (Boolean)chanresult.get(ManagedConnection.TRANSACTIONAL));
+ assertFalse("Flow should have been blocked", (Boolean)chanresult.get(ManagedConnection.FLOW_BLOCKED));
+ assertEquals("Unexpected unack'd count", new Integer(1967), (Integer)chanresult.get(ManagedConnection.UNACKED_COUNT));
+ }
+
+ public void testMaxChannels() throws Exception
+ {
+ _serverConnection.getConnectionDelegate().setChannelMax(10001);
+ assertEquals("Max channels not got correctly", new Long(10001), _mbean.getMaximumNumberOfChannels());
+ }
+
+ public void testRollback() throws Exception
+ {
+ _mbean.rollbackTransactions(1);
+ assertFalse("Rollback performed despite not being transacted", _serverSession.isRolledback());
+
+ _serverSession.setTransactional(true);
+ _mbean.rollbackTransactions(1);
+ assertTrue("Rollback not performed", _serverSession.isRolledback());
+
+ try
+ {
+ _mbean.rollbackTransactions(2);
+ fail("Exception expected");
+ }
+ catch (JMException jme)
+ {
+ //pass
+ }
+ }
+
+ public void testCommit() throws Exception
+ {
+ _mbean.commitTransactions(1);
+ assertFalse("Commit performed despite not being transacted", _serverSession.isCommitted());
+
+ _serverSession.setTransactional(true);
+ _mbean.commitTransactions(1);
+ assertTrue("Commit not performed", _serverSession.isCommitted());
+
+ try
+ {
+ _mbean.commitTransactions(2);
+ fail("Exception expected");
+ }
+ catch (JMException jme)
+ {
+ //pass
+ }
+ }
+
+ public void testGetName()
+ {
+ assertEquals("Unexpected Object Instance Name", "\"address\"", _mbean.getObjectInstanceName());
+ }
+
+ public void testEnableStatistics()
+ {
+ assertFalse("Unexpected statistics enable flag", _mbean.isStatisticsEnabled());
+ _mbean.setStatisticsEnabled(true);
+ assertTrue("Unexpected statistics enable flag", _mbean.isStatisticsEnabled());
+ }
+
+ public void testLastIOTime()
+ {
+ assertEquals("Unexpected last IO time", new Date(1), _mbean.getLastIoTime());
+ }
+
+ private class ServerSessionMock extends ServerSession
+ {
+ private int _channelId = 0;
+ private boolean _committed = false;
+ private boolean _rolledback = false;
+ private boolean _transacted = false;
+
+ ServerSessionMock(Connection connection, int channelId)
+ {
+ super(connection, new ServerSessionDelegate(), new Binary(String.valueOf(channelId).getBytes()), 1 , _serverConnection.getConfig());
+ _channelId = channelId;
+ _sessions.add(this);
+ }
+
+ public int getChannel()
+ {
+ return _channelId;
+ }
+
+ @Override
+ public void commit()
+ {
+ _committed = true;
+ }
+
+ @Override
+ public void rollback()
+ {
+ _rolledback = true;
+ }
+
+ public boolean isCommitted()
+ {
+ return _committed;
+ }
+
+ public boolean isRolledback()
+ {
+ return _rolledback;
+ }
+
+ @Override
+ public int getUnacknowledgedMessageCount()
+ {
+ return 1967;
+ }
+
+ public boolean isTransactional()
+ {
+ return _transacted;
+ }
+
+ public void setTransactional(boolean transacted)
+ {
+ _transacted = transacted;
+ }
+ }
+}
diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
index d16db65d5d..c2900a3533 100644
--- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
+++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
@@ -80,33 +80,12 @@ public interface ManagedConnection
Date getLastIoTime();
/**
- * Tells the total number of bytes written till now.
- * @return number of bytes written.
- *
- @MBeanAttribute(name="WrittenBytes", description="The total number of bytes written till now")
- Long getWrittenBytes();
- */
- /**
- * Tells the total number of bytes read till now.
- * @return number of bytes read.
- *
- @MBeanAttribute(name="ReadBytes", description="The total number of bytes read till now")
- Long getReadBytes();
- */
-
- /**
* Threshold high value for no of channels. This is useful in setting notifications or
* taking required action is there are more channels being created.
* @return threshold limit for no of channels
*/
- Long getMaximumNumberOfChannels();
-
- /**
- * Sets the threshold high value for number of channels for a connection
- * @param value
- */
@MBeanAttribute(name="MaximumNumberOfChannels", description="The threshold high value for number of channels for this connection")
- void setMaximumNumberOfChannels(Long value);
+ Long getMaximumNumberOfChannels();
//********** Operations *****************//
diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
new file mode 100644
index 0000000000..5598dda5de
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.management.jmx;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.JMException;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.TabularData;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ManagedConnectionMBeanTest extends QpidBrokerTestCase
+{
+ /**
+ * JMX helper.
+ */
+ private JMXTestUtils _jmxUtils;
+ private Connection _connection;
+
+ public void setUp() throws Exception
+ {
+ _jmxUtils = new JMXTestUtils(this);
+ _jmxUtils.setUp();
+ super.setUp();
+ _jmxUtils.open();
+ _connection = getConnection();
+ }
+
+ public void tearDown() throws Exception
+ {
+ if (_jmxUtils != null)
+ {
+ _jmxUtils.close();
+ }
+ super.tearDown();
+ }
+
+ public void testChannels() throws Exception
+ {
+ final String queueName = getTestQueueName();
+
+ final Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ final Destination destination = session.createQueue(queueName);
+ final MessageConsumer consumer = session.createConsumer(destination);
+
+ final int numberOfMessages = 2;
+ sendMessage(session, destination, numberOfMessages);
+ _connection.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ final Message m = consumer.receive(1000l);
+ assertNotNull("Message " + i + " is not received", m);
+ }
+
+ List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
+ assertNotNull("Connection MBean is not found", connections);
+ assertEquals("Unexpected number of connection mbeans", 1, connections.size());
+ final ManagedConnection mBean = connections.get(0);
+ assertNotNull("Connection MBean is null", mBean);
+
+ TabularData channelsData = mBean.channels();
+ assertNotNull("Channels data are null", channelsData);
+ assertEquals("Unexpected number of rows in channel table", 1, channelsData.size());
+
+ final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) channelsData.values().iterator();
+ final CompositeDataSupport row = rowItr.next();
+ Number unackCount = (Number) row.get(ManagedConnection.UNACKED_COUNT);
+ final Boolean transactional = (Boolean) row.get(ManagedConnection.TRANSACTIONAL);
+ final Boolean flowBlocked = (Boolean) row.get(ManagedConnection.FLOW_BLOCKED);
+ assertNotNull("Channel should have unacknowledged messages", unackCount);
+ assertEquals("Unexpected number of unacknowledged messages", 2, unackCount.intValue());
+ assertNotNull("Channel should have transaction flag", transactional);
+ assertTrue("Unexpected transaction flag", transactional);
+ assertNotNull("Channel should have flow blocked flag", flowBlocked);
+ assertFalse("Unexpected value of flow blocked flag", flowBlocked);
+
+ final Date initialLastIOTime = mBean.getLastIoTime();
+ session.commit();
+ assertTrue("Last IO time should have been updated", mBean.getLastIoTime().after(initialLastIOTime));
+
+ channelsData = mBean.channels();
+ assertNotNull("Channels data are null", channelsData);
+ assertEquals("Unexpected number of rows in channel table", 1, channelsData.size());
+
+ final Iterator<CompositeDataSupport> rowItr2 = (Iterator<CompositeDataSupport>) channelsData.values().iterator();
+ final CompositeDataSupport row2 = rowItr2.next();
+ unackCount = (Number) row2.get(ManagedConnection.UNACKED_COUNT);
+ assertNotNull("Channel should have unacknowledged messages", unackCount);
+ assertEquals("Unexpected number of anacknowledged messages", 0, unackCount.intValue());
+
+ _connection.close();
+
+ connections = _jmxUtils.getManagedConnections("test");
+ assertNotNull("Connection MBean is not found", connections);
+ assertEquals("Unexpected number of connection mbeans", 0, connections.size());
+ }
+
+ public void testCommit() throws Exception
+ {
+ final String queueName = getTestQueueName();
+
+ final Session consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ final Destination destination = producerSession.createQueue(queueName);
+ final MessageConsumer consumer = consumerSession.createConsumer(destination);
+ final MessageProducer producer = producerSession.createProducer(destination);
+
+ _connection.start();
+
+ List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
+ assertNotNull("Connection MBean is not found", connections);
+ assertEquals("Unexpected number of connection mbeans", 1, connections.size());
+ final ManagedConnection mBean = connections.get(0);
+ assertNotNull("Connection MBean is null", mBean);
+
+ final int numberOfMessages = 2;
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ producer.send(producerSession.createTextMessage("Test " + i));
+ }
+
+ Message m = consumer.receive(500l);
+ assertNull("Unexpected message received", m);
+
+ Number channelId = getFirstTransactedChannelId(mBean, 2);
+ mBean.commitTransactions(channelId.intValue());
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ m = consumer.receive(1000l);
+ assertNotNull("Message " + i + " is not received", m);
+ assertEquals("Unexpected message received at " + i, "Test " + i, ((TextMessage) m).getText());
+ }
+ producerSession.commit();
+ m = consumer.receive(500l);
+ assertNull("Unexpected message received", m);
+ }
+
+ protected Number getFirstTransactedChannelId(final ManagedConnection mBean, int channelNumber) throws IOException, JMException
+ {
+ TabularData channelsData = mBean.channels();
+ assertNotNull("Channels data are null", channelsData);
+ assertEquals("Unexpected number of rows in channel table", channelNumber, channelsData.size());
+ final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) channelsData.values().iterator();
+ while (rowItr.hasNext())
+ {
+ final CompositeDataSupport row = rowItr.next();
+ Boolean transacted = (Boolean) row.get(ManagedConnection.TRANSACTIONAL);
+ if (transacted.booleanValue())
+ {
+ return (Number) row.get(ManagedConnection.CHAN_ID);
+ }
+ }
+ return null;
+ }
+
+ public void testRollback() throws Exception
+ {
+ final String queueName = getTestQueueName();
+
+ final Session consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ final Destination destination = producerSession.createQueue(queueName);
+ final MessageConsumer consumer = consumerSession.createConsumer(destination);
+ final MessageProducer producer = producerSession.createProducer(destination);
+
+ List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
+ assertNotNull("Connection MBean is not found", connections);
+ assertEquals("Unexpected number of connection mbeans", 1, connections.size());
+ final ManagedConnection mBean = connections.get(0);
+ assertNotNull("Connection MBean is null", mBean);
+
+ final int numberOfMessages = 2;
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ producer.send(producerSession.createTextMessage("Test " + i));
+ }
+
+ Number channelId = getFirstTransactedChannelId(mBean, 2);
+ mBean.rollbackTransactions(channelId.intValue());
+
+ Message m = consumer.receive(1000l);
+ assertNull("Unexpected message received", m);
+
+ producerSession.commit();
+
+ _connection.start();
+ m = consumer.receive(1000l);
+ assertNull("Unexpected message received", m);
+ }
+
+ public void testAuthorisedId() throws Exception
+ {
+ List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
+ assertNotNull("Connection MBean is not found", connections);
+ assertEquals("Unexpected number of connection mbeans", 1, connections.size());
+ final ManagedConnection mBean = connections.get(0);
+ assertNotNull("Connection MBean is null", mBean);
+ assertEquals("Unexpected authorized id", "guest", mBean.getAuthorizedId());
+ }
+}
diff --git a/java/test-profiles/Java010Excludes b/java/test-profiles/Java010Excludes
index e7718b982d..c687574409 100755
--- a/java/test-profiles/Java010Excludes
+++ b/java/test-profiles/Java010Excludes
@@ -42,10 +42,6 @@ org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartsFlowStopped
org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartConsumerFlowStarted
org.apache.qpid.server.logging.SubscriptionLoggingTest#testSubscriptionSuspend
-// 0-10 Broker does not have a JMX connection MBean
-org.apache.qpid.management.jmx.ManagementActorLoggingTest#testConnectionCloseViaManagement
-org.apache.qpid.management.jmx.MessageConnectionStatisticsTest#*
-
// 0-10 is not supported by the MethodRegistry
org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*