summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-02-06 15:33:15 +0000
committerKeith Wall <kwall@apache.org>2014-02-06 15:33:15 +0000
commitbd45ab84615df6b5c0751ec556984ec0cb58b690 (patch)
tree9a8f014312657fb2249156c8306aa0a65a55a20f
parent11143fc7fa2b018e511280d39ca08630176f363c (diff)
downloadqpid-python-bd45ab84615df6b5c0751ec556984ec0cb58b690.tar.gz
QPID-5409: Refactor BDB HA MBean to delegate the operations to the underlying node configured object.
Unregister BDB HA MBean on virtual host deletion or replication node removal git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1565308 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java122
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java16
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java3
-rw-r--r--qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java145
-rw-r--r--qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/VirtualHostMBeanTest.java76
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java52
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java60
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java23
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java33
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java76
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java17
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java36
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java3
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/MBeanUtils.java33
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java105
15 files changed, 454 insertions, 346 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
index 4f59ff3a7a..bc7a3298a5 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
@@ -19,8 +19,17 @@
*/
package org.apache.qpid.server.store.berkeleydb.jmx;
+import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC;
+import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY;
+import static org.apache.qpid.server.model.ReplicationNode.DURABILITY;
+import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME;
+import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.ROLE;
+
import java.io.IOException;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
import javax.management.JMException;
@@ -36,10 +45,13 @@ import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObject;
-import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import org.apache.qpid.server.model.ConfiguredObjectFinder;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
/**
* Management mbean for BDB HA.
@@ -52,12 +64,13 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
private static final CompositeType GROUP_MEMBER_ROW;
private static final OpenType<?>[] GROUP_MEMBER_ATTRIBUTE_TYPES;
+
static
{
try
{
GROUP_MEMBER_ATTRIBUTE_TYPES = new OpenType<?>[] {SimpleType.STRING, SimpleType.STRING};
- final String[] itemNames = new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT};
+ final String[] itemNames = new String[] {GRP_MEM_COL_NODE_NAME, GRP_MEM_COL_NODE_HOST_PORT};
final String[] itemDescriptions = new String[] {"Unique node name", "Node host / port "};
GROUP_MEMBER_ROW = new CompositeType("GroupMember", "Replication group member",
itemNames,
@@ -65,7 +78,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
GROUP_MEMBER_ATTRIBUTE_TYPES );
GROUP_MEMBERS_TABLE = new TabularType("GroupMembers", "Replication group memebers",
GROUP_MEMBER_ROW,
- new String[] {ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME});
+ new String[] {GRP_MEM_COL_NODE_NAME});
}
catch (final OpenDataException ode)
{
@@ -73,15 +86,24 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
}
}
- private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
+ private final ReplicationNode _localReplicationNode;
private final String _objectName;
+ private final VirtualHost _parent;
+ private final String _virtualHostName;
- protected BDBHAMessageStoreManagerMBean(String virtualHostName, ReplicatedEnvironmentFacade replicatedEnvironmentFacade, ManagedObject parent) throws JMException
+ public BDBHAMessageStoreManagerMBean(ReplicationNode localReplicationNode, ManagedObject parent) throws JMException
{
super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, ((AMQManagedObject)parent).getRegistry());
- LOGGER.debug("Creating BDBHAMessageStoreManagerMBean for " + virtualHostName);
- _replicatedEnvironmentFacade = replicatedEnvironmentFacade;
- _objectName = ObjectName.quote(virtualHostName);
+
+ _localReplicationNode = localReplicationNode;
+ _virtualHostName = localReplicationNode.getParent(VirtualHost.class).getName();
+ _objectName = ObjectName.quote(_virtualHostName);
+ _parent = _localReplicationNode.getParent(VirtualHost.class);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Creating BDBHAMessageStoreManagerMBean for " + _localReplicationNode.getName());
+ }
register();
}
@@ -94,60 +116,44 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
@Override
public String getGroupName()
{
- return _replicatedEnvironmentFacade.getGroupName();
+ return (String) _localReplicationNode.getAttribute(GROUP_NAME);
}
@Override
public String getNodeName()
{
- return _replicatedEnvironmentFacade.getNodeName();
+ return (String) _localReplicationNode.getName();
}
@Override
public String getNodeHostPort()
{
- return _replicatedEnvironmentFacade.getHostPort();
+ return (String) _localReplicationNode.getAttribute(HOST_PORT);
}
@Override
public String getHelperHostPort()
{
- return _replicatedEnvironmentFacade.getHelperHostPort();
+ return (String) _localReplicationNode.getAttribute(HELPER_HOST_PORT);
}
@Override
public String getDurability() throws IOException, JMException
{
- try
- {
- return _replicatedEnvironmentFacade.getDurability();
- }
- catch (RuntimeException e)
- {
- LOGGER.debug("Failed query replication policy", e);
- throw new JMException(e.getMessage());
- }
+ return (String) _localReplicationNode.getAttribute(DURABILITY);
}
@Override
public boolean getCoalescingSync() throws IOException, JMException
{
- return _replicatedEnvironmentFacade.isCoalescingSync();
+ return (Boolean)_localReplicationNode.getAttribute(COALESCING_SYNC);
}
@Override
public String getNodeState() throws IOException, JMException
{
- try
- {
- return _replicatedEnvironmentFacade.getNodeState();
- }
- catch (RuntimeException e)
- {
- LOGGER.debug("Failed query node state", e);
- throw new JMException(e.getMessage());
- }
+ return (String)_localReplicationNode.getAttribute(ROLE);
}
@Override
@@ -155,7 +161,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- return _replicatedEnvironmentFacade.isDesignatedPrimary();
+ return (Boolean)_localReplicationNode.getAttribute(DESIGNATED_PRIMARY);
}
catch (RuntimeException e)
{
@@ -167,12 +173,16 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
@Override
public TabularData getAllNodesInGroup() throws IOException, JMException
{
- final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE);
- final List<Map<String, String>> members = _replicatedEnvironmentFacade.getGroupMembers();
+ Collection<ReplicationNode> allNodes = _parent.getChildren(ReplicationNode.class);
- for (Map<String, String> map : members)
+ final TabularDataSupport data = new TabularDataSupport(GROUP_MEMBERS_TABLE);
+ for (ReplicationNode replicationNode : allNodes)
{
- CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, map);
+ Map<String, String> nodeMap = new HashMap<String, String>();
+ nodeMap.put(GRP_MEM_COL_NODE_NAME, replicationNode.getName());
+ nodeMap.put(GRP_MEM_COL_NODE_HOST_PORT, (String)replicationNode.getAttribute(HOST_PORT));
+
+ CompositeData memberData = new CompositeDataSupport(GROUP_MEMBER_ROW, nodeMap);
data.put(memberData);
}
return data;
@@ -181,14 +191,26 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
@Override
public void removeNodeFromGroup(String nodeName) throws JMException
{
+ // find the replication node object, set the desired state
+ Collection<ReplicationNode> allNodes = _parent.getChildren(ReplicationNode.class);
+ ReplicationNode targetNode = ConfiguredObjectFinder.findConfiguredObjectByName(allNodes, nodeName);
+
+ if (targetNode == null)
+ {
+ throw new JMException("Failed to find replication node with name '" + nodeName + "'.");
+ }
try
{
- _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
+ State newState = targetNode.setDesiredState(targetNode.getActualState(), State.DELETED);
+ if (newState != State.DELETED)
+ {
+ throw new JMException("Failed to delete replication node with name '" + nodeName + "'. New unexpectedly state is " + newState);
+ }
}
- catch (Exception e)
+ catch(IllegalStateTransitionException e)
{
- LOGGER.error("Failed to remove node " + nodeName + " from group", e);
- throw new JMException(e.getMessage());
+ LOGGER.error("Cannot remove node '" + nodeName + "' from the group", e);
+ throw new JMException("Cannot remove node '" + nodeName + "' from the group:" + e.getMessage());
}
}
@@ -197,11 +219,11 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
try
{
- _replicatedEnvironmentFacade.setDesignatedPrimary(primary);
+ _localReplicationNode.setAttribute(DESIGNATED_PRIMARY, _localReplicationNode.getAttribute(DESIGNATED_PRIMARY), primary);
}
- catch (AMQStoreException e)
+ catch (Exception e)
{
- LOGGER.error("Failed to set node " + _replicatedEnvironmentFacade.getNodeName() + " as designated primary", e);
+ LOGGER.error("Failed to set node " + _localReplicationNode.getName() + " to designated primary : " + primary, e);
throw new JMException(e.getMessage());
}
}
@@ -209,15 +231,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
@Override
public void updateAddress(String nodeName, String newHostName, int newPort) throws JMException
{
- try
- {
- _replicatedEnvironmentFacade.updateAddress(nodeName, newHostName, newPort);
- }
- catch(AMQStoreException e)
- {
- LOGGER.error("Failed to update address for node " + nodeName + " to " + newHostName + ":" + newPort, e);
- throw new JMException(e.getMessage());
- }
+ throw new UnsupportedOperationException("Unsupported operation. Delete the node then add a new node in its place.");
}
@Override
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
index 16199d30a3..1c362c3023 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
@@ -21,14 +21,12 @@
package org.apache.qpid.server.store.berkeleydb.jmx;
import javax.management.JMException;
-import javax.management.StandardMBean;
import org.apache.log4j.Logger;
import org.apache.qpid.server.jmx.MBeanProvider;
import org.apache.qpid.server.jmx.ManagedObject;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+import org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNode;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
/**
@@ -48,24 +46,20 @@ public class BDBHAMessageStoreManagerMBeanProvider implements MBeanProvider
@Override
public boolean isChildManageableByMBean(ConfiguredObject child)
{
- return (child instanceof VirtualHost
- && ReplicatedEnvironmentFacade.TYPE.equals(child.getAttribute(VirtualHost.STORE_TYPE)));
+ return (child instanceof LocalReplicationNode);
}
@Override
- public StandardMBean createMBean(ConfiguredObject child, StandardMBean parent) throws JMException
+ public ManagedObject createMBean(ConfiguredObject child, ManagedObject parent) throws JMException
{
- VirtualHost virtualHostChild = (VirtualHost) child;
-
- BDBMessageStore messageStore = (BDBMessageStore) virtualHostChild.getMessageStore();
+ LocalReplicationNode localReplicationNode = (LocalReplicationNode) child;
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Creating mBean for child " + child);
}
- ReplicatedEnvironmentFacade replicatedEnvironmentFacade = (ReplicatedEnvironmentFacade)messageStore.getEnvironmentFacade();
- return new BDBHAMessageStoreManagerMBean(virtualHostChild.getName(), replicatedEnvironmentFacade, (ManagedObject) parent);
+ return new BDBHAMessageStoreManagerMBean(localReplicationNode, parent);
}
@Override
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java
index b85e44526b..fc1cd0801a 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ManagedBDBHAMessageStore.java
@@ -41,6 +41,9 @@ public interface ManagedBDBHAMessageStore
public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary";
public static final String ATTR_COALESCING_SYNC = "CoalescingSync";
+ public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
+ public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+
@MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group")
String getGroupName() throws IOException, JMException;
diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
index af017ed812..b5e01732dd 100644
--- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
+++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
@@ -19,15 +19,21 @@
*/
package org.apache.qpid.server.store.berkeleydb.jmx;
-import static org.mockito.Mockito.doThrow;
+import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC;
+import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY;
+import static org.apache.qpid.server.model.ReplicationNode.DURABILITY;
+import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME;
+import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.ROLE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.jar.JarException;
import javax.management.JMException;
import javax.management.ObjectName;
@@ -37,17 +43,15 @@ import javax.management.openmbean.TabularData;
import junit.framework.TestCase;
-import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
-import org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBean;
-import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
-import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
-
-import com.sleepycat.je.DatabaseException;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
public class BDBHAMessageStoreManagerMBeanTest extends TestCase
{
@@ -57,11 +61,12 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
private static final String TEST_HELPER_HOST_PORT = "host:5678";
private static final String TEST_DURABILITY = "sync,sync,all";
private static final String TEST_NODE_STATE = "MASTER";
- private static final String TEST_STORE_NAME = "testStoreName";
private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false;
+ private static final String TEST_VHOST_NAME = "test";
- private ReplicatedEnvironmentFacade _replicatedEnvironmentFacadee;
private BDBHAMessageStoreManagerMBean _mBean;
+ private VirtualHost _virtualHost;
+ private ReplicationNode _localReplicationNode;
private AMQManagedObject _mBeanParent;
@Override
@@ -70,10 +75,16 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
super.setUp();
CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
- _replicatedEnvironmentFacadee = mock(ReplicatedEnvironmentFacade.class);
+ _localReplicationNode = mock(ReplicationNode.class);
+ _virtualHost = mock(VirtualHost.class);
_mBeanParent = mock(AMQManagedObject.class);
when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class));
- _mBean = new BDBHAMessageStoreManagerMBean(TEST_STORE_NAME, _replicatedEnvironmentFacadee, _mBeanParent);
+ when(_localReplicationNode.getParent(VirtualHost.class)).thenReturn(_virtualHost);
+
+ when(_localReplicationNode.getName()).thenReturn(TEST_NODE_NAME);
+ when(_virtualHost.getName()).thenReturn(TEST_VHOST_NAME);
+
+ _mBean = new BDBHAMessageStoreManagerMBean(_localReplicationNode, _mBeanParent);
}
@Override
@@ -85,101 +96,124 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
public void testObjectName() throws Exception
{
- String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_STORE_NAME);
+ String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_VHOST_NAME);
assertEquals(expectedObjectName, _mBean.getObjectName().toString());
}
public void testGroupName() throws Exception
{
- when(_replicatedEnvironmentFacadee.getGroupName()).thenReturn(TEST_GROUP_NAME);
+ when(_localReplicationNode.getAttribute(GROUP_NAME)).thenReturn(TEST_GROUP_NAME);
assertEquals(TEST_GROUP_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_GROUP_NAME));
}
public void testNodeName() throws Exception
{
- when(_replicatedEnvironmentFacadee.getNodeName()).thenReturn(TEST_NODE_NAME);
-
assertEquals(TEST_NODE_NAME, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_NAME));
}
public void testNodeHostPort() throws Exception
{
- when(_replicatedEnvironmentFacadee.getHostPort()).thenReturn(TEST_NODE_HOST_PORT);
+ when(_localReplicationNode.getAttribute(HOST_PORT)).thenReturn(TEST_NODE_HOST_PORT);
assertEquals(TEST_NODE_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_HOST_PORT));
}
public void testHelperHostPort() throws Exception
{
- when(_replicatedEnvironmentFacadee.getHelperHostPort()).thenReturn(TEST_HELPER_HOST_PORT);
+ when(_localReplicationNode.getAttribute(HELPER_HOST_PORT)).thenReturn(TEST_HELPER_HOST_PORT);
assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT));
}
public void testDurability() throws Exception
{
- when(_replicatedEnvironmentFacadee.getDurability()).thenReturn(TEST_DURABILITY);
+ when(_localReplicationNode.getAttribute(DURABILITY)).thenReturn(TEST_DURABILITY);
assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY));
}
public void testCoalescingSync() throws Exception
{
- when(_replicatedEnvironmentFacadee.isCoalescingSync()).thenReturn(true);
+ when(_localReplicationNode.getAttribute(COALESCING_SYNC)).thenReturn(true);
assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC));
}
public void testNodeState() throws Exception
{
- when(_replicatedEnvironmentFacadee.getNodeState()).thenReturn(TEST_NODE_STATE);
+ when(_localReplicationNode.getAttribute(ROLE)).thenReturn(TEST_NODE_STATE);
assertEquals(TEST_NODE_STATE, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_NODE_STATE));
}
public void testDesignatedPrimaryFlag() throws Exception
{
- when(_replicatedEnvironmentFacadee.isDesignatedPrimary()).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
+ when(_localReplicationNode.getAttribute(DESIGNATED_PRIMARY)).thenReturn(TEST_DESIGNATED_PRIMARY_FLAG);
assertEquals(TEST_DESIGNATED_PRIMARY_FLAG, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DESIGNATED_PRIMARY));
}
public void testGroupMembersForGroupWithOneNode() throws Exception
{
- List<Map<String, String>> members = Collections.singletonList(createTestNodeResult());
- when(_replicatedEnvironmentFacadee.getGroupMembers()).thenReturn(members);
+ ReplicationNode remoteNode = mock(ReplicationNode.class);
+ when(remoteNode.getName()).thenReturn("remotenode");
+ when(remoteNode.getAttribute(HOST_PORT)).thenReturn("remotehost:port");
+
+ when(_localReplicationNode.getAttribute(HOST_PORT)).thenReturn(TEST_NODE_HOST_PORT);
+
+ Collection<ReplicationNode> nodes = new ArrayList<ReplicationNode>();
+ nodes.add(_localReplicationNode);
+ nodes.add(remoteNode);
+ when(_virtualHost.getChildren(ReplicationNode.class)).thenReturn(nodes);
final TabularData resultsTable = _mBean.getAllNodesInGroup();
- assertTableHasHeadingsNamed(resultsTable, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT);
+ assertTableHasHeadingsNamed(resultsTable, BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_NAME, BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_HOST_PORT);
final int numberOfDataRows = resultsTable.size();
- assertEquals("Unexpected number of data rows", 1 ,numberOfDataRows);
- final CompositeData row = (CompositeData) resultsTable.values().iterator().next();
- assertEquals(TEST_NODE_NAME, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME));
- assertEquals(TEST_NODE_HOST_PORT, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT));
+ assertEquals("Unexpected number of data rows", 2 ,numberOfDataRows);
+ Iterator<?> iterator = resultsTable.values().iterator();
+
+ final CompositeData firstRow = (CompositeData) iterator.next();
+ assertEquals(TEST_NODE_NAME, firstRow.get(BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_NAME));
+ assertEquals(TEST_NODE_HOST_PORT, firstRow.get(BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_HOST_PORT));
+
+ final CompositeData secondRow = (CompositeData) iterator.next();
+ assertEquals("remotenode", secondRow.get(BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_NAME));
+ assertEquals("remotehost:port", secondRow.get(BDBHAMessageStoreManagerMBean.GRP_MEM_COL_NODE_HOST_PORT));
+
}
public void testRemoveNodeFromReplicationGroup() throws Exception
{
+ Collection<ReplicationNode> nodes = new ArrayList<ReplicationNode>();
+ nodes.add(_localReplicationNode);
+ when(_virtualHost.getChildren(ReplicationNode.class)).thenReturn(nodes);
+ when(_localReplicationNode.getActualState()).thenReturn(State.ACTIVE);
+ when(_localReplicationNode.setDesiredState(State.ACTIVE, State.DELETED)).thenReturn(State.DELETED);
+
_mBean.removeNodeFromGroup(TEST_NODE_NAME);
- verify(_replicatedEnvironmentFacadee).removeNodeFromGroup(TEST_NODE_NAME);
+ verify(_localReplicationNode).setDesiredState(State.ACTIVE, State.DELETED);
}
- public void testRemoveNodeFromReplicationGroupWithError() throws Exception
+ public void testRemoveNodeFromReplicationGroupOnIllegalStateTransitionException() throws Exception
{
- doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacadee).removeNodeFromGroup(TEST_NODE_NAME);
+ Collection<ReplicationNode> nodes = new ArrayList<ReplicationNode>();
+ nodes.add(_localReplicationNode);
+ when(_virtualHost.getChildren(ReplicationNode.class)).thenReturn(nodes);
+ when(_localReplicationNode.getActualState()).thenReturn(State.ACTIVE);
+ when(_localReplicationNode.setDesiredState(State.ACTIVE, State.DELETED)).thenThrow(new IllegalStateTransitionException());
try
{
_mBean.removeNodeFromGroup(TEST_NODE_NAME);
- fail("Exception not thrown");
+ fail("Should throw JM Exception on IllegalStateTransitionException");
}
- catch (JMException je)
+ catch(JMException e)
{
- // PASS
+ //pass
}
}
@@ -187,32 +221,7 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
{
_mBean.setDesignatedPrimary(true);
- verify(_replicatedEnvironmentFacadee).setDesignatedPrimary(true);
- }
-
- public void testSetAsDesignatedPrimaryWithError() throws Exception
- {
- doThrow(new AMQStoreException("mocked exception")).when(_replicatedEnvironmentFacadee).setDesignatedPrimary(true);
-
- try
- {
- _mBean.setDesignatedPrimary(true);
- fail("Exception not thrown");
- }
- catch (JMException je)
- {
- // PASS
- }
- }
-
- public void testUpdateAddress() throws Exception
- {
- String newHostName = "newHostName";
- int newPort = 1967;
-
- _mBean.updateAddress(TEST_NODE_NAME, newHostName, newPort);
-
- verify(_replicatedEnvironmentFacadee).updateAddress(TEST_NODE_NAME, newHostName, newPort);
+ verify(_localReplicationNode).setAttribute(DESIGNATED_PRIMARY, null, true);
}
private void assertTableHasHeadingsNamed(final TabularData resultsTable, String... headingNames)
@@ -223,12 +232,4 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
assertTrue("Table should have column with heading " + headingName, headingsRow.containsKey(headingName));
}
}
-
- private Map<String, String> createTestNodeResult()
- {
- Map<String, String> items = new HashMap<String, String>();
- items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
- items.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
- return items;
- }
}
diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/VirtualHostMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/VirtualHostMBeanTest.java
new file mode 100644
index 0000000000..9995e9a02a
--- /dev/null
+++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/VirtualHostMBeanTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.jmx;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.jmx.DefaultManagedObject;
+import org.apache.qpid.server.jmx.ManagedObjectRegistry;
+import org.apache.qpid.server.jmx.mbeans.VirtualHostMBean;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNode;
+
+public class VirtualHostMBeanTest extends TestCase
+{
+ private VirtualHost _mockVirtualHost;
+ private ManagedObjectRegistry _mockManagedObjectRegistry;
+ private VirtualHostMBean _virtualHostMBean;
+ private LocalReplicationNode _mockLocalReplicationNode;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ _mockVirtualHost = mock(VirtualHost.class);
+ _mockManagedObjectRegistry = mock(ManagedObjectRegistry.class);
+ _mockLocalReplicationNode = mock(LocalReplicationNode.class);
+ when(_mockLocalReplicationNode.getParent(VirtualHost.class)).thenReturn(_mockVirtualHost);
+ when(_mockVirtualHost.getName()).thenReturn("vhost");
+
+ _virtualHostMBean = new VirtualHostMBean(_mockVirtualHost, _mockManagedObjectRegistry);
+ }
+
+ public void testAdditionalMbeanRegistered_LocalReplicationNodeAdded() throws Exception
+ {
+ _virtualHostMBean.childAdded(_mockVirtualHost, _mockLocalReplicationNode);
+
+ verify(_mockManagedObjectRegistry, times(2)).registerObject(any(DefaultManagedObject.class));
+ }
+
+ public void testAdditionalMbeanUnregisteredOnUnregisterOfThisMbean() throws Exception
+ {
+ _virtualHostMBean.childAdded(_mockVirtualHost, _mockLocalReplicationNode);
+ _virtualHostMBean.unregister();
+
+ verify(_mockManagedObjectRegistry, times(2)).unregisterObject(any(DefaultManagedObject.class));
+ }
+
+ public void testAdditionalMbeanUnregistered_LocalReplicationNodeRemoved() throws Exception
+ {
+ _virtualHostMBean.childAdded(_mockVirtualHost, _mockLocalReplicationNode);
+
+ _virtualHostMBean.childRemoved(_mockVirtualHost, _mockLocalReplicationNode);
+ verify(_mockManagedObjectRegistry).unregisterObject(any(BDBHAMessageStoreManagerMBean.class));
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
index 9ecef6c8c5..de301b91ba 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
@@ -29,11 +29,13 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.State;
@@ -44,6 +46,7 @@ import org.apache.qpid.server.model.adapter.AbstractAdapter;
import org.apache.qpid.server.model.adapter.NoStatistics;
import org.apache.qpid.server.util.MapValueConverter;
+import com.sleepycat.je.rep.MasterStateException;
import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException;
@@ -70,6 +73,8 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
private volatile long _joinTime;
private volatile long _lastTransactionId;
+ private final AtomicReference<State> _state;
+
public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, VirtualHost virtualHost,
TaskExecutor taskExecutor, ReplicatedEnvironmentFacade replicatedEnvironmentFacade)
{
@@ -79,6 +84,7 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
_hostPort = replicationNode.getHostName() + ":" + replicationNode.getPort();
_replicationNode = replicationNode;
_replicatedEnvironmentFacade = replicatedEnvironmentFacade;
+ _state = new AtomicReference<State>(State.ACTIVE);
}
@Override
@@ -102,7 +108,7 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
@Override
public State getActualState()
{
- return State.UNAVAILABLE;
+ return _state.get();
}
@Override
@@ -158,27 +164,29 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
@Override
protected boolean setState(State currentState, State desiredState)
{
- if (desiredState == State.STOPPED)
- {
- return true;
- }
- else if (desiredState == State.DELETED)
+ //TODO: Need to decide how to display STOPPED state on a remote node when a corresponding local node is in STOPPED state
+ if (desiredState == State.DELETED)
{
- if (ReplicatedEnvironment.State.REPLICA.name().equals(getAttribute(ROLE)) )
+ String nodeName = getName();
+
+ if (LOGGER.isDebugEnabled())
{
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Deleting node " + _groupName + ":" + getName());
- }
- try
- {
- _replicatedEnvironmentFacade.removeNodeFromGroup(getName());
- return true;
- }
- catch (Exception e)
- {
- LOGGER.warn("Failure to remove node remotely", e);
- }
+ LOGGER.debug("Deleting node '" + nodeName + "' from group '" + _groupName + "'");
+ }
+
+ try
+ {
+ _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
+ _state.set(State.DELETED);
+ return true;
+ }
+ catch(MasterStateException e)
+ {
+ throw new IllegalStateTransitionException("Node '" + nodeName + "' cannot be deleted when role is a master");
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e);
}
}
return false;
@@ -191,11 +199,11 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
{
return getId();
}
- else if (ReplicationNode.LIFETIME_POLICY.equals(name))
+ else if (LIFETIME_POLICY.equals(name))
{
return getLifetimePolicy();
}
- else if (ReplicationNode.DURABLE.equals(name))
+ else if (DURABLE.equals(name))
{
return isDurable();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index a490710187..97fae5e76b 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -20,16 +20,24 @@
*/
package org.apache.qpid.server.store.berkeleydb.replication;
-import static org.apache.qpid.server.model.ReplicationNode.*;
+import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC;
+import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY;
+import static org.apache.qpid.server.model.ReplicationNode.DURABILITY;
+import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME;
+import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS;
+import static org.apache.qpid.server.model.ReplicationNode.PRIORITY;
+import static org.apache.qpid.server.model.ReplicationNode.QUORUM_OVERRIDE;
+import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
+import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -60,7 +68,6 @@ import com.sleepycat.je.Durability;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentFailureException;
-import com.sleepycat.je.OperationFailureException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.InsufficientReplicasException;
@@ -139,9 +146,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public static final String TYPE = "BDB-HA";
- // TODO: JMX will change to observe the model, at that point these names will disappear
- public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
- public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
private final LocalReplicationNode _replicationNode;
private final Durability _durability;
@@ -445,21 +449,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return _environment.getRepMutableConfig().getDesignatedPrimary();
}
- public List<Map<String, String>> getGroupMembers()
- {
- List<Map<String, String>> members = new ArrayList<Map<String, String>>();
-
- for (ReplicationNode node : _environment.getGroup().getNodes())
- {
- Map<String, String> nodeMap = new HashMap<String, String>();
- nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, node.getName());
- nodeMap.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort());
- members.add(nodeMap);
- }
-
- return members;
- }
-
public void removeNodeFromGroup(final String nodeName)
{
createReplicationGroupAdmin().removeMember(nodeName);
@@ -487,26 +476,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- public void updateAddress(final String nodeName, final String newHostName, final int newPort) throws AMQStoreException
- {
- try
- {
- createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort);
- }
- catch (OperationFailureException ofe)
- {
- // TODO: I am not sure about the exception handing here
- throw new AMQStoreException("Failed to update address for '" + nodeName + "' with new host " + newHostName
- + " and new port " + newPort + ". " + ofe.getMessage(), ofe);
- }
- catch (DatabaseException e)
- {
- // TODO: I am not sure about the exception handing here
- throw handleDatabaseException("Failed to update address for '" + nodeName + "' with new host " + newHostName
- + " and new port " + newPort + ". " + e.getMessage(), e);
- }
- }
-
int getPriority()
{
ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
@@ -958,6 +927,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return new DbPing(repNode, (String)_replicationNode.getAttribute(GROUP_NAME), DB_PING_SOCKET_TIMEOUT).getNodeState();
}
+ // For testing only
+ int getNumberOfElectableGroupMembers()
+ {
+ return _environment.getGroup().getElectableNodes().size();
+ }
+
private final class GroupChangeLearner implements Runnable
{
@Override
@@ -1135,4 +1110,5 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java
index 080bd67bd3..25c58e47a5 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java
@@ -14,6 +14,7 @@ import static org.apache.qpid.server.model.ReplicationNode.ROLE;
import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -24,9 +25,11 @@ import java.util.Map.Entry;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
+import com.sleepycat.je.rep.MasterStateException;
import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.ReplicatedEnvironment.State;
import com.sleepycat.je.rep.ReplicationNode;
@@ -103,6 +106,26 @@ public class RemoteReplicationNodeTest extends QpidTestCase
verify(_replicatedEnvironmentFacade, never()).transferMasterAsynchronously(_nodeName);
}
+ public void testSetDesiredStateToDeleted() throws Exception
+ {
+ _node.setDesiredState(_node.getActualState(), org.apache.qpid.server.model.State.DELETED);
+ verify(_replicatedEnvironmentFacade).removeNodeFromGroup(_nodeName);
+ }
+
+ public void testSetDesiredStateToDeletedOnMasterStateException() throws Exception
+ {
+ doThrow(new MasterStateException("mocked exception")).when(_replicatedEnvironmentFacade).removeNodeFromGroup(_nodeName);
+ try
+ {
+ _node.setDesiredState(_node.getActualState(), org.apache.qpid.server.model.State.DELETED);
+ fail("Exception not thrown");
+ }
+ catch(IllegalStateTransitionException e)
+ {
+ // pass
+ }
+ }
+
public void testSetImmutableAttributesThrowException() throws Exception
{
Map<String, Object> changeAttributeMap = new HashMap<String, Object>();
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 7648f13a9c..45117b17c2 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -216,15 +216,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState());
}
- public void testGetGroupMembers() throws Exception
- {
- List<Map<String, String>> groupMembers = createMaster().getGroupMembers();
- Map<String, String> expectedMember = new HashMap<String, String>();
- expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
- expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
- Set<Map<String, String>> expectedGroupMembers = Collections.singleton(expectedMember);
- assertEquals("Unexpected group members", expectedGroupMembers, new HashSet<Map<String, String>>(groupMembers));
- }
public void testPriority() throws Exception
{
@@ -274,8 +265,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
addReplica(nodeName2, node2NodeHostPort, listener);
- List<Map<String, String>> groupMembers = master.getGroupMembers();
- assertEquals("Unexpected number of nodes", 2, groupMembers.size());
+ assertEquals("Unexpected number of nodes", 2, master.getNumberOfElectableGroupMembers());
assertTrue("Listener not fired within timeout", nodeRecoveryLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
@@ -299,8 +289,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = addNode(State.MASTER, stateChangeListener, listener);
assertTrue("Master was not started", stateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
- List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
- assertEquals("Unexpected number of nodes at start of test", 1, initialGroupMembers.size());
+ assertEquals("Unexpected number of nodes at start of test", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
String node2Name = TEST_NODE_NAME + "_2";
String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
@@ -308,8 +297,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertTrue("Listener not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
- List<Map<String, String>> groupMembers = replicatedEnvironmentFacade.getGroupMembers();
- assertEquals("Unexpected number of nodes", 2, groupMembers.size());
+ assertEquals("Unexpected number of nodes", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
}
@@ -349,8 +337,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
addReplica(node2Name, node2NodeHostPort);
- List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
- assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size());
+ assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
// Need to await the listener hearing the addition of the node to the model.
assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
@@ -360,8 +347,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertTrue("Node delete not fired within timeout", nodeDeletedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
- List<Map<String, String>> groupMembers = replicatedEnvironmentFacade.getGroupMembers();
- assertEquals("Unexpected number of nodes after node removal", 1, groupMembers.size());
+ assertEquals("Unexpected number of nodes after node removal", 1, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
assertEquals("Unexpected number of listener invocations", 1, invocationCount.get());
}
@@ -389,8 +375,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
String node2NodeHostPort = "localhost" + ":" + getNextAvailable(TEST_NODE_PORT + 1);
addReplica(node2Name, node2NodeHostPort);
- List<Map<String, String>> initialGroupMembers = replicatedEnvironmentFacade.getGroupMembers();
- assertEquals("Unexpected number of nodes at start of test", 2, initialGroupMembers.size());
+ assertEquals("Unexpected number of nodes at start of test", 2, replicatedEnvironmentFacade.getNumberOfElectableGroupMembers());
assertTrue("Node add not fired within timeout", nodeAddedLatch.await(LISTENER_TIMEOUT, TimeUnit.SECONDS));
@@ -417,13 +402,11 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
String node2NodeHostPort = "localhost:" + getNextAvailable(TEST_NODE_PORT + 1);
ReplicatedEnvironmentFacade ref2 = addReplica(node2Name, node2NodeHostPort);
- List<Map<String, String>> groupMembers = environmentFacade.getGroupMembers();
- assertEquals("Unexpected group members count", 2, groupMembers.size());
+ assertEquals("Unexpected group members count", 2, environmentFacade.getNumberOfElectableGroupMembers());
ref2.close();
environmentFacade.removeNodeFromGroup(node2Name);
- groupMembers = environmentFacade.getGroupMembers();
- assertEquals("Unexpected group members count", 1, groupMembers.size());
+ assertEquals("Unexpected group members count", 1, environmentFacade.getNumberOfElectableGroupMembers());
}
public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
index 6bddf5876f..057940c036 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -38,12 +38,9 @@ import org.apache.log4j.Logger;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
-import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import com.sleepycat.je.EnvironmentFailureException;
-
/**
* System test verifying the ability to control a cluster via the Management API.
*
@@ -144,11 +141,11 @@ public class HAClusterManagementTest extends QpidBrokerTestCase
CompositeData row = groupMembers.get(new Object[] {nodeName});
assertNotNull("Table does not contain row for node name " + nodeName, row);
- assertEquals(nodeHostPort, row.get(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT));
+ assertEquals(nodeHostPort, row.get(ManagedBDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
}
}
- public void testRemoveNodeFromGroup() throws Exception
+ public void testRemoveRemoteNodeFromGroup() throws Exception
{
final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next();
@@ -156,72 +153,19 @@ public class HAClusterManagementTest extends QpidBrokerTestCase
final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation);
awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES);
- final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved));
+ final String removedNodeName = _clusterCreator.getNodeNameForBrokerPort(brokerPortNumberToBeRemoved);
_clusterCreator.stopNode(brokerPortNumberToBeRemoved);
- storeBean.removeNodeFromGroup(removedNodeName);
-
- final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size();
- assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval);
- }
-
- /**
- * Updates the address of a node.
- *
- * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit
- * assert.
- *
- * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case
- */
- public void _testUpdateAddress() throws Exception
- {
- final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
- final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next();
- final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
- final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate);
-
- _clusterCreator.stopNode(brokerPortNumberToBeMoved);
-
- final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
- final int newBdbPort = getNextAvailable(oldBdbPort + 1);
-
- storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort);
-//TODO
- //_clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
-
- _clusterCreator.startNode(brokerPortNumberToBeMoved);
- }
- /**
- * @see #testUpdateAddress()
- */
- public void _testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception
- {
- final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
- final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
-
- _clusterCreator.stopNode(brokerPortNumberToBeMoved);
-
- final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
- final int newBdbPort = getNextAvailable(oldBdbPort + 1);
-
- // now deliberately don't call updateAddress
-//TODO
- //_clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);
+ storeBean.removeNodeFromGroup(removedNodeName);
- try
+ long limitTime = System.currentTimeMillis() + 5000;
+ while((NUMBER_OF_NODES == storeBean.getAllNodesInGroup().size()) && System.currentTimeMillis() < limitTime)
{
- _clusterCreator.startNode(brokerPortNumberToBeMoved);
- fail("Exception not thrown");
- }
- catch(RuntimeException rte)
- {
- //check cause was BDBs EnvironmentFailureException
- assertTrue("Message '"+rte.getMessage()+"' does not contain '"
- + EnvironmentFailureException.class.getName()
- + "'.",
- rte.getMessage().contains(EnvironmentFailureException.class.getName()));
- // PASS
+ Thread.sleep(100l);
}
+
+ int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size();
+ assertEquals("Unexpected number of data rows after test", NUMBER_OF_NODES - 1, numberOfDataRowsAfterRemoval);
}
public void testVirtualHostOperationsDeniedForNonMasterNode() throws Exception
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
index 824eaf0a3d..3674f59bba 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -414,7 +414,20 @@ public class HATestClusterCreator
public Map<String, Object> getReplicationNodeAttributes(int brokerPort, String replicationNodeName) throws IOException
{
RestTestHelper restHelper = createRestTestHelper(brokerPort);
- return restHelper.getJsonAsSingletonList("/rest/replicationnode/" + _virtualHostName + "/" + replicationNodeName );
+ List<Map<String, Object>> results= restHelper.getJsonAsList("/rest/replicationnode/" + _virtualHostName + "/" + replicationNodeName );
+ int size = results.size();
+ if (size == 0)
+ {
+ return Collections.emptyMap();
+ }
+ else if (size == 1)
+ {
+ return results.get(0);
+ }
+ else
+ {
+ throw new RuntimeException("Unexpected number of nodes " + size);
+ }
}
private RestTestHelper createRestTestHelper(int brokerPort)
@@ -430,7 +443,7 @@ public class HATestClusterCreator
while(!desiredRole.equals(data.get(ReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000)
{
- LOGGER.debug("Awaiting node to transit into " + desiredRole + " role");
+ LOGGER.debug("Awaiting node '" + nodeName + "' to transit into " + desiredRole + " role");
data = getReplicationNodeAttributes(brokerPort, nodeName);
if (!desiredRole.equals(data.get(ReplicationNode.ROLE)))
{
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java
index 4f434f2cd4..44421c721a 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java
@@ -35,6 +35,7 @@ import javax.management.JMException;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.jmx.mbeans.LoggingManagementMBean;
+import org.apache.qpid.server.jmx.mbeans.MBeanUtils;
import org.apache.qpid.server.jmx.mbeans.UserManagementMBean;
import org.apache.qpid.server.jmx.mbeans.ServerInformationMBean;
import org.apache.qpid.server.jmx.mbeans.Shutdown;
@@ -52,7 +53,6 @@ import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.adapter.AbstractPluginAdapter;
import org.apache.qpid.server.plugin.PluginFactory;
-import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.util.MapValueConverter;
public class JMXManagement extends AbstractPluginAdapter implements ConfigurationChangeListener
@@ -183,7 +183,9 @@ public class JMXManagement extends AbstractPluginAdapter implements Configuratio
{
LOGGER.debug("Check for additional MBeans for virtual host:" + virtualHost.getName());
}
- createAdditionalMBeansFromProviders(virtualHost, mbean);
+
+ _children.put(virtualHost, mbean);
+ MBeanUtils.createAdditionalMBeansFromProviders(virtualHost, mbean);
}
}
Collection<AuthenticationProvider> authenticationProviders = broker.getAuthenticationProviders();
@@ -273,7 +275,10 @@ public class JMXManagement extends AbstractPluginAdapter implements Configuratio
if (mbean != null)
{
- createAdditionalMBeansFromProviders(child, mbean);
+ _children.put(child, mbean);
+ MBeanUtils.createAdditionalMBeansFromProviders(child, mbean);
+ // TODO track the mbeans that have been created on behalf of a child in a map, then
+ // if the child is ever removed, destroy these beans too.
}
}
catch(Exception e)
@@ -311,31 +316,6 @@ public class JMXManagement extends AbstractPluginAdapter implements Configuratio
// no-op
}
- private void createAdditionalMBeansFromProviders(ConfiguredObject child, AMQManagedObject mbean) throws JMException
- {
- _children.put(child, mbean);
-
- QpidServiceLoader<MBeanProvider> qpidServiceLoader = new QpidServiceLoader<MBeanProvider>();
- for (MBeanProvider provider : qpidServiceLoader.instancesOf(MBeanProvider.class))
- {
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Consulting mbean provider : " + provider + " for child : " + child);
- }
-
- if (provider.isChildManageableByMBean(child))
- {
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Provider will create mbean");
- }
- provider.createMBean(child, mbean);
- // TODO track the mbeans that have been created on behalf of a child in a map, then
- // if the child is ever removed, destroy these beans too.
- }
- }
- }
-
@Override
public String getName()
{
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java
index a0ef052314..941aac1a2a 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanProvider.java
@@ -22,7 +22,6 @@
package org.apache.qpid.server.jmx;
import javax.management.JMException;
-import javax.management.StandardMBean;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.plugin.Pluggable;
@@ -47,6 +46,6 @@ public interface MBeanProvider extends Pluggable
*
* @return newly created mbean
*/
- StandardMBean createMBean(ConfiguredObject child, StandardMBean parent) throws JMException;
+ ManagedObject createMBean(ConfiguredObject child, ManagedObject parent) throws JMException;
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/MBeanUtils.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/MBeanUtils.java
index 97e84d4796..1ddcae33a8 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/MBeanUtils.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/MBeanUtils.java
@@ -20,15 +20,28 @@
*/
package org.apache.qpid.server.jmx.mbeans;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import javax.management.JMException;
import javax.management.OperationsException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.jmx.AMQManagedObject;
+import org.apache.qpid.server.jmx.MBeanProvider;
+import org.apache.qpid.server.jmx.ManagedObject;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFinder;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
public class MBeanUtils
{
+ private static final Logger LOGGER = Logger.getLogger(MBeanUtils.class);
+
public static Queue findQueueFromQueueName(VirtualHost virtualHost, String queueName) throws OperationsException
{
Queue queue = ConfiguredObjectFinder.findConfiguredObjectByName(virtualHost.getQueues(), queueName);
@@ -54,4 +67,24 @@ public class MBeanUtils
return exchange;
}
}
+
+ public static Collection<ManagedObject> createAdditionalMBeansFromProviders(ConfiguredObject child, AMQManagedObject mbean) throws JMException
+ {
+ List<ManagedObject> mbeans = new ArrayList<ManagedObject>();
+ QpidServiceLoader<MBeanProvider> qpidServiceLoader = new QpidServiceLoader<MBeanProvider>();
+ for (MBeanProvider provider : qpidServiceLoader.instancesOf(MBeanProvider.class))
+ {
+ if (provider.isChildManageableByMBean(child))
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Provider of type " + provider.getType() + " will create MBean for child : " + child);
+ }
+ ManagedObject childMBean = provider.createMBean(child, mbean);
+ mbeans.add(childMBean);
+ }
+ }
+ return mbeans;
+ }
+
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java
index e9e3e1df49..fa907552fd 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java
@@ -21,6 +21,14 @@
package org.apache.qpid.server.jmx.mbeans;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.management.JMException;
+import javax.management.ObjectName;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.jmx.AMQManagedObject;
import org.apache.qpid.server.jmx.ManagedObject;
@@ -29,26 +37,26 @@ import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.virtualhost.ManagedVirtualHost;
-import javax.management.JMException;
-import javax.management.ObjectName;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost, ConfigurationChangeListener
{
private static final Logger LOGGER = Logger.getLogger(VirtualHostMBean.class);
private final VirtualHost _virtualHost;
+ private final Object _childrenLock = new Object();
+
private final Map<ConfiguredObject, AMQManagedObject> _children =
new HashMap<ConfiguredObject, AMQManagedObject>();
+
+ private final Map<ConfiguredObject, Collection<ManagedObject>> _additionalChildren =
+ new HashMap<ConfiguredObject, Collection<ManagedObject>>();
+
private VirtualHostManagerMBean _managerMBean;
public VirtualHostMBean(VirtualHost virtualHost, ManagedObjectRegistry registry) throws JMException
@@ -61,13 +69,48 @@ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtual
initExchanges();
initConnections();
+ initAdditionalMbeansForAllChildren();
+
//This is the actual JMX bean for this 'VirtualHostMBean', leave it alone.
_managerMBean = new VirtualHostManagerMBean(this);
}
+ private void initAdditionalMbeansForAllChildren()
+ {
+ synchronized (_childrenLock)
+ {
+ Collection<Class<? extends ConfiguredObject>> childrenTypes = Model.getInstance().getChildTypes(VirtualHost.class);
+ for (Class<? extends ConfiguredObject> childType : childrenTypes)
+ {
+ Collection<? extends ConfiguredObject> children = _virtualHost.getChildren(childType);
+ for (ConfiguredObject child : children)
+ {
+ createAdditionalMBeans(child);
+ }
+ }
+ }
+ }
+
+ private void createAdditionalMBeans(ConfiguredObject child)
+ {
+ try
+ {
+ Collection<ManagedObject> mbeans = MBeanUtils.createAdditionalMBeansFromProviders(child, this);
+ if (!mbeans.isEmpty())
+ {
+ _additionalChildren.put(child, mbeans);
+ }
+ }
+ catch(Exception e)
+ {
+ LOGGER.error("Cannot create mbeans for the child " + child.getName(), e);
+ }
+ }
+
+
private void initQueues()
{
- synchronized (_children)
+ synchronized (_childrenLock)
{
for(Queue queue : _virtualHost.getQueues())
{
@@ -88,7 +131,7 @@ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtual
private void initExchanges()
{
- synchronized (_children)
+ synchronized (_childrenLock)
{
for(Exchange exchange : _virtualHost.getExchanges())
{
@@ -109,7 +152,7 @@ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtual
private void initConnections()
{
- synchronized (_children)
+ synchronized (_childrenLock)
{
for(Connection conn : _virtualHost.getConnections())
{
@@ -145,7 +188,7 @@ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtual
public void childAdded(ConfiguredObject object, ConfiguredObject child)
{
- synchronized (_children)
+ synchronized (_childrenLock)
{
try
{
@@ -153,36 +196,29 @@ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtual
{
QueueMBean queueMB = new QueueMBean((Queue)child, this);
_children.put(child, queueMB);
-
}
else if(child instanceof Exchange)
{
ExchangeMBean exchangeMBean = new ExchangeMBean((Exchange)child, this);
_children.put(child, exchangeMBean);
-
}
else if(child instanceof Connection)
{
ConnectionMBean connectionMBean = new ConnectionMBean((Connection)child, this);
_children.put(child, connectionMBean);
-
- }
- else
- {
- LOGGER.debug("Unsupported child : " + child.getName() + " type : " + child.getClass());
}
-
}
catch(Exception e)
{
LOGGER.error("Exception while creating mbean for " + child.getClass().getSimpleName() + " " + child.getName(), e);
}
+ createAdditionalMBeans(child);
}
}
public void childRemoved(ConfiguredObject object, ConfiguredObject child)
{
- synchronized (_children)
+ synchronized (_childrenLock)
{
AMQManagedObject mbean = _children.remove(child);
if(mbean != null)
@@ -196,6 +232,26 @@ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtual
LOGGER.error("Exception while unregistering mbean for " + child.getClass().getSimpleName() + " " + child.getName(), e);
}
}
+ unregisterAdditionalMBeansIfPresent(child);
+ }
+ }
+
+ private void unregisterAdditionalMBeansIfPresent(ConfiguredObject child)
+ {
+ Collection<ManagedObject> additionalMBeans = _additionalChildren.remove(child);
+ if (additionalMBeans != null)
+ {
+ for (ManagedObject mbean : additionalMBeans)
+ {
+ try
+ {
+ mbean.unregister();
+ }
+ catch(Exception e)
+ {
+ LOGGER.error("Exception while unregistering mbean for " + child.getClass().getSimpleName() + " " + child.getName(), e);
+ }
+ }
}
}
@@ -213,7 +269,7 @@ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtual
public Collection<QueueMBean> getQueues()
{
Collection<AMQManagedObject> children;
- synchronized (_children)
+ synchronized (_childrenLock)
{
children = new ArrayList<AMQManagedObject>(_children.values());
}
@@ -235,7 +291,7 @@ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtual
{
_virtualHost.removeChangeListener(this);
- synchronized (_children)
+ synchronized (_childrenLock)
{
for (AMQManagedObject mbean : _children.values())
{
@@ -252,6 +308,11 @@ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtual
}
}
_children.clear();
+
+ for (ConfiguredObject child : new ArrayList<ConfiguredObject>(_additionalChildren.keySet()))
+ {
+ unregisterAdditionalMBeansIfPresent(child);
+ }
}
_managerMBean.unregister();
}