summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java130
1 files changed, 118 insertions, 12 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 92115dd39f..d045ae01fa 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -95,7 +95,7 @@ import org.apache.qpid.server.util.DaemonThreadFactory;
public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
{
public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval";
- public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout";
+ public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.replication.db_ping_socket_timeout";
public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.remote_node_monitor_interval";
private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class);
@@ -289,10 +289,20 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
try
{
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Closing replicated environment");
+ }
+
closeEnvironment();
}
finally
{
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Deregistering environment home " + _environmentDirectory);
+ }
+
EnvHomeRegistry.getInstance().deregisterHome(_environmentDirectory);
}
}
@@ -823,6 +833,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private void closeEnvironment()
{
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Closing JE environment for " + _prettyGroupNodeName);
+ }
+
// Clean the log before closing. This makes sure it doesn't contain
// redundant data. Closing without doing this means the cleaner may not
// get a chance to finish.
@@ -1094,15 +1109,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return environment;
}
- NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
- {
- if (repNode == null)
- {
- throw new IllegalArgumentException("Node cannot be null");
- }
- return new DbPing(repNode, (String)_configuration.getGroupName(), DB_PING_SOCKET_TIMEOUT).getNodeState();
- }
-
public int getNumberOfElectableGroupMembers()
{
if (_state.get() != State.OPEN)
@@ -1181,6 +1187,105 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ Set<String> getPermittedNodes()
+ {
+ return Collections.unmodifiableSet(_permittedNodes);
+ }
+
+ public static NodeState getRemoteNodeState(String groupName, ReplicationNode repNode) throws IOException, ServiceConnectFailedException
+ {
+ if (repNode == null)
+ {
+ throw new IllegalArgumentException("Node cannot be null");
+ }
+ return new DbPing(repNode, groupName, DB_PING_SOCKET_TIMEOUT).getNodeState();
+ }
+
+ public static Set<String> convertApplicationStateBytesToPermittedNodeList(byte[] applicationState)
+ {
+ if (applicationState == null || applicationState.length == 0)
+ {
+ return Collections.emptySet();
+ }
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ Map<String, Object> settings = objectMapper.readValue(applicationState, Map.class);
+ return new HashSet<String>((Collection<String>)settings.get(PERMITTED_NODE_LIST));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Unexpected exception on de-serializing of application state", e);
+ }
+ }
+
+ public static void connectToHelperNodeAndCheckPermittedHosts(String nodeName, String hostPort, String groupName, String helperNodeName, String helperHostPort)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug(String.format("Requesting state of the node '%s' at '%s'", helperNodeName, helperHostPort));
+ }
+
+ if (helperNodeName == null || "".equals(helperNodeName))
+ {
+ throw new IllegalConfigurationException(String.format("A helper node is not specified for node '%s'"
+ + " joining the group '%s'", nodeName, groupName));
+ }
+
+ Collection<String> permittedNodes = null;
+ try
+ {
+ ReplicationNodeImpl node = new ReplicationNodeImpl(helperNodeName, helperHostPort);
+ NodeState state = getRemoteNodeState(groupName, node);
+ byte[] applicationState = state.getAppState();
+ permittedNodes = convertApplicationStateBytesToPermittedNodeList(applicationState);
+ }
+ catch (IOException e)
+ {
+ throw new IllegalConfigurationException(String.format("Cannot connect to '%s'", helperHostPort), e);
+ }
+ catch (ServiceConnectFailedException e)
+ {
+ throw new IllegalConfigurationException(String.format("Failure to connect to '%s'", helperHostPort), e);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(String.format("Unexpected exception on attempt to retrieve state from '%s' at '%s'",
+ helperNodeName, helperHostPort), e);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug(String.format("Attribute 'permittedNodes' on node '%s' is set to '%s'", helperNodeName, String.valueOf(permittedNodes)));
+ }
+
+ if (permittedNodes==null || !permittedNodes.contains(hostPort))
+ {
+ throw new IllegalConfigurationException(String.format("Node from '%s' is not permitted!", hostPort));
+ }
+ }
+
+ private void findMasterNodeStateAndApplyPermittedNodes(Collection<NodeState> nodeStates)
+ {
+ if (ReplicatedEnvironment.State.MASTER != _environment.getState())
+ {
+ for (NodeState nodeState : nodeStates)
+ {
+ if (nodeState.getNodeState() == ReplicatedEnvironment.State.MASTER)
+ {
+ byte[] applicationState = nodeState.getAppState();
+ Set<String> permittedNodes = convertApplicationStateBytesToPermittedNodeList(applicationState);
+ if (!_permittedNodes.equals(permittedNodes))
+ {
+ setPermittedNodes(permittedNodes);
+ }
+ break;
+ }
+ }
+ }
+ }
+
private void registerAppStateMonitorIfPermittedNodesSpecified()
{
if (!_permittedNodes.isEmpty())
@@ -1286,8 +1391,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
executeDatabasePingerOnNodeChangesIfMaster(nodeStates);
notifyGroupListenerAboutNodeStates(nodeStates);
- }
+ findMasterNodeStateAndApplyPermittedNodes(nodeStates.values());
+ }
}
finally
{
@@ -1384,7 +1490,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
NodeState nodeStateObject = null;
try
{
- nodeStateObject = getRemoteNodeState(node);
+ nodeStateObject = getRemoteNodeState((String)_configuration.getGroupName(), node);
}
catch (IOException | ServiceConnectFailedException e )
{