diff options
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java')
-rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java | 130 |
1 files changed, 118 insertions, 12 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 92115dd39f..d045ae01fa 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -95,7 +95,7 @@ import org.apache.qpid.server.util.DaemonThreadFactory; public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener { public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval"; - public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout"; + public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.replication.db_ping_socket_timeout"; public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.remote_node_monitor_interval"; private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class); @@ -289,10 +289,20 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { try { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Closing replicated environment"); + } + closeEnvironment(); } finally { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Deregistering environment home " + _environmentDirectory); + } + EnvHomeRegistry.getInstance().deregisterHome(_environmentDirectory); } } @@ -823,6 +833,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private void closeEnvironment() { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Closing JE environment for " + _prettyGroupNodeName); + } + // Clean the log before closing. This makes sure it doesn't contain // redundant data. Closing without doing this means the cleaner may not // get a chance to finish. @@ -1094,15 +1109,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return environment; } - NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException - { - if (repNode == null) - { - throw new IllegalArgumentException("Node cannot be null"); - } - return new DbPing(repNode, (String)_configuration.getGroupName(), DB_PING_SOCKET_TIMEOUT).getNodeState(); - } - public int getNumberOfElectableGroupMembers() { if (_state.get() != State.OPEN) @@ -1181,6 +1187,105 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + Set<String> getPermittedNodes() + { + return Collections.unmodifiableSet(_permittedNodes); + } + + public static NodeState getRemoteNodeState(String groupName, ReplicationNode repNode) throws IOException, ServiceConnectFailedException + { + if (repNode == null) + { + throw new IllegalArgumentException("Node cannot be null"); + } + return new DbPing(repNode, groupName, DB_PING_SOCKET_TIMEOUT).getNodeState(); + } + + public static Set<String> convertApplicationStateBytesToPermittedNodeList(byte[] applicationState) + { + if (applicationState == null || applicationState.length == 0) + { + return Collections.emptySet(); + } + + ObjectMapper objectMapper = new ObjectMapper(); + try + { + Map<String, Object> settings = objectMapper.readValue(applicationState, Map.class); + return new HashSet<String>((Collection<String>)settings.get(PERMITTED_NODE_LIST)); + } + catch (Exception e) + { + throw new RuntimeException("Unexpected exception on de-serializing of application state", e); + } + } + + public static void connectToHelperNodeAndCheckPermittedHosts(String nodeName, String hostPort, String groupName, String helperNodeName, String helperHostPort) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(String.format("Requesting state of the node '%s' at '%s'", helperNodeName, helperHostPort)); + } + + if (helperNodeName == null || "".equals(helperNodeName)) + { + throw new IllegalConfigurationException(String.format("A helper node is not specified for node '%s'" + + " joining the group '%s'", nodeName, groupName)); + } + + Collection<String> permittedNodes = null; + try + { + ReplicationNodeImpl node = new ReplicationNodeImpl(helperNodeName, helperHostPort); + NodeState state = getRemoteNodeState(groupName, node); + byte[] applicationState = state.getAppState(); + permittedNodes = convertApplicationStateBytesToPermittedNodeList(applicationState); + } + catch (IOException e) + { + throw new IllegalConfigurationException(String.format("Cannot connect to '%s'", helperHostPort), e); + } + catch (ServiceConnectFailedException e) + { + throw new IllegalConfigurationException(String.format("Failure to connect to '%s'", helperHostPort), e); + } + catch (Exception e) + { + throw new RuntimeException(String.format("Unexpected exception on attempt to retrieve state from '%s' at '%s'", + helperNodeName, helperHostPort), e); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(String.format("Attribute 'permittedNodes' on node '%s' is set to '%s'", helperNodeName, String.valueOf(permittedNodes))); + } + + if (permittedNodes==null || !permittedNodes.contains(hostPort)) + { + throw new IllegalConfigurationException(String.format("Node from '%s' is not permitted!", hostPort)); + } + } + + private void findMasterNodeStateAndApplyPermittedNodes(Collection<NodeState> nodeStates) + { + if (ReplicatedEnvironment.State.MASTER != _environment.getState()) + { + for (NodeState nodeState : nodeStates) + { + if (nodeState.getNodeState() == ReplicatedEnvironment.State.MASTER) + { + byte[] applicationState = nodeState.getAppState(); + Set<String> permittedNodes = convertApplicationStateBytesToPermittedNodeList(applicationState); + if (!_permittedNodes.equals(permittedNodes)) + { + setPermittedNodes(permittedNodes); + } + break; + } + } + } + } + private void registerAppStateMonitorIfPermittedNodesSpecified() { if (!_permittedNodes.isEmpty()) @@ -1286,8 +1391,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan executeDatabasePingerOnNodeChangesIfMaster(nodeStates); notifyGroupListenerAboutNodeStates(nodeStates); - } + findMasterNodeStateAndApplyPermittedNodes(nodeStates.values()); + } } finally { @@ -1384,7 +1490,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan NodeState nodeStateObject = null; try { - nodeStateObject = getRemoteNodeState(node); + nodeStateObject = getRemoteNodeState((String)_configuration.getGroupName(), node); } catch (IOException | ServiceConnectFailedException e ) { |