diff options
author | Keith Wall <kwall@apache.org> | 2014-01-31 13:21:51 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-01-31 13:21:51 +0000 |
commit | 622266139f1914769605b7a77cdc48b3e5e82e07 (patch) | |
tree | 9687e3101a95f9633e9c9d5c2fd841f694dff660 | |
parent | f5bb2323007e5071e42b1de684317af388936dd4 (diff) | |
download | qpid-python-622266139f1914769605b7a77cdc48b3e5e82e07.tar.gz |
QPID-5409: Setting attributes on LocalReplicationNode now causes the corresponding update to the facade.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1563131 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 716 insertions, 213 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java index 556bcc54c3..042144efe4 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java @@ -43,19 +43,25 @@ import org.apache.qpid.server.util.ParameterizedTypeImpl; import com.sleepycat.je.Durability; import com.sleepycat.je.Durability.ReplicaAckPolicy; import com.sleepycat.je.Durability.SyncPolicy; +import com.sleepycat.je.rep.ReplicatedEnvironment; public class LocalReplicationNode extends AbstractAdapter implements ReplicationNode { private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY); + static final boolean DEFAULT_DESIGNATED_PRIMARY = false; + static final int DEFAULT_PRIORITY = 1; + static final int DEFAULT_QUORUM_OVERRIDE = 0; @SuppressWarnings("serial") static final Map<String, Object> DEFAULTS = new HashMap<String, Object>() {{ put(DURABILITY, DEFAULT_DURABILITY.toString()); put(COALESCING_SYNC, true); - put(DESIGNATED_PRIMARY, false); + put(DESIGNATED_PRIMARY, DEFAULT_DESIGNATED_PRIMARY); + put(PRIORITY, DEFAULT_PRIORITY); + put(QUORUM_OVERRIDE, DEFAULT_QUORUM_OVERRIDE); //TODO: add defaults for parameters and replicatedParameters }}; @@ -77,10 +83,16 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication put(PARAMETERS, new ParameterizedTypeImpl(Map.class, String.class, String.class)); put(REPLICATION_PARAMETERS, new ParameterizedTypeImpl(Map.class, String.class, String.class)); put(STORE_PATH, String.class); + put(LAST_KNOWN_REPLICATION_TRANSACTION_ID, Long.class); }}; + static final String[] IMMUTABLE_ATTRIBUTES = {ReplicationNode.GROUP_NAME, ReplicationNode.HELPER_HOST_PORT, + ReplicationNode.HOST_PORT, ReplicationNode.COALESCING_SYNC, ReplicationNode.DURABILITY, + ReplicationNode.JOIN_TIME, ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID, ReplicationNode.NAME, + ReplicationNode.STORE_PATH, ReplicationNode.PARAMETERS, ReplicationNode.REPLICATION_PARAMETERS}; + private final VirtualHost _virtualHost; - private ReplicatedEnvironmentFacade _replicatedEnvironmentFacade; + private volatile ReplicatedEnvironmentFacade _replicatedEnvironmentFacade; //TODO: add state management public LocalReplicationNode(UUID id, Map<String, Object> attributes, VirtualHost virtualHost, TaskExecutor taskExecutor) @@ -131,13 +143,6 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication } @Override - public State getDesiredState() - { - // TODO - return getActualState(); - } - - @Override public State getActualState() { // TODO @@ -220,49 +225,25 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication { return _replicatedEnvironmentFacade.getNodeState(); } - else if(DURABILITY.equals(attributeName)) - { - return _replicatedEnvironmentFacade.getDurability(); - } - else if(PRIORITY.equals(attributeName)) - { - return _replicatedEnvironmentFacade.getPriority(); - } - else if(GROUP_NAME.equals(attributeName)) - { - return _replicatedEnvironmentFacade.getGroupName(); - } - else if(NAME.equals(attributeName)) - { - return _replicatedEnvironmentFacade.getNodeName(); - } - else if(HOST_PORT.equals(attributeName)) + else if(JOIN_TIME.equals(attributeName)) { - return _replicatedEnvironmentFacade.getHostPort(); + return _replicatedEnvironmentFacade.getJoinTime(); } - else if(HELPER_HOST_PORT.equals(attributeName)) + else if(LAST_KNOWN_REPLICATION_TRANSACTION_ID.equals(attributeName)) { - return _replicatedEnvironmentFacade.getHelperHostPort(); + return _replicatedEnvironmentFacade.getLastKnownReplicationTransactionId(); } - else if(COALESCING_SYNC.equals(attributeName)) + else if(QUORUM_OVERRIDE.equals(attributeName)) { - return _replicatedEnvironmentFacade.isCoalescingSync(); + return _replicatedEnvironmentFacade.getElectableGroupSizeOverride(); } else if(DESIGNATED_PRIMARY.equals(attributeName)) { return _replicatedEnvironmentFacade.isDesignatedPrimary(); } - else if(QUORUM_OVERRIDE.equals(attributeName)) - { - return _replicatedEnvironmentFacade.getQuorumOverride(); - } - else if(JOIN_TIME.equals(attributeName)) - { - return _replicatedEnvironmentFacade.getJoinTime(); - } - else if(LAST_KNOWN_REPLICATION_TRANSACTION_ID.equals(attributeName)) + else if(PRIORITY.equals(attributeName)) { - return _replicatedEnvironmentFacade.getLastKnownReplicationTransactionId(); + return _replicatedEnvironmentFacade.getPriority(); } } return super.getAttribute(attributeName); @@ -300,7 +281,113 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication throws IllegalStateException, AccessControlException, IllegalArgumentException { - throw new UnsupportedOperationException(); + Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES); + + checkWhetherImmutableAttributeChanged(convertedAttributes); + + updateReplicatedEnvironmentFacade(convertedAttributes); + + super.changeAttributes(convertedAttributes); + } + + private void updateReplicatedEnvironmentFacade(Map<String, Object> convertedAttributes) + { + if (_replicatedEnvironmentFacade != null) + { + if (convertedAttributes.get(PRIORITY) != null) + { + int priority = (Integer)convertedAttributes.get(PRIORITY); + try + { + _replicatedEnvironmentFacade.setPriority(priority); + } + catch(Exception e) + { + throw new IllegalConfigurationException("Cannot set attribute " + PRIORITY + " to " + priority, e); + } + } + + if (convertedAttributes.get(DESIGNATED_PRIMARY) != null) + { + boolean designatedPrimary = (Boolean)convertedAttributes.get(DESIGNATED_PRIMARY); + try + { + _replicatedEnvironmentFacade.setDesignatedPrimary(designatedPrimary); + } + catch(Exception e) + { + throw new IllegalConfigurationException("Cannot set attribute '" + DESIGNATED_PRIMARY + "' to " + designatedPrimary, e); + } + } + + if (convertedAttributes.get(QUORUM_OVERRIDE) != null) + { + int quorumOverride = (Integer)convertedAttributes.get(QUORUM_OVERRIDE); + try + { + _replicatedEnvironmentFacade.setElectableGroupSizeOverride(quorumOverride); + } + catch(Exception e) + { + throw new IllegalConfigurationException("Cannot set attribute '" + QUORUM_OVERRIDE + "' to " + quorumOverride, e); + } + } + } + + if (convertedAttributes.containsKey(ROLE)) + { + String currentRole = (String)getAttribute(ROLE); + if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole)) + { + throw new IllegalConfigurationException("Cannot transfer mastership when not a replica"); + } + + // we do not want to write role into the store + String role = (String)convertedAttributes.remove(ROLE); + + if (ReplicatedEnvironment.State.MASTER.name().equals(role) ) + { + try + { + _replicatedEnvironmentFacade.transferMasterToSelfAsynchronously(); + } + catch(Exception e) + { + throw new IllegalConfigurationException("Cannot transfer mastership", e); + } + } + else + { + throw new IllegalConfigurationException("Changing role to other value then " + ReplicatedEnvironment.State.MASTER.name() + " is unsupported"); + } + } + } + + private void checkWhetherImmutableAttributeChanged(Map<String, Object> convertedAttributes) + { + for (int i = 0; i < IMMUTABLE_ATTRIBUTES.length; i++) + { + String attributeName = IMMUTABLE_ATTRIBUTES[i]; + if (convertedAttributes.containsKey(attributeName)) + { + Object newValue = convertedAttributes.get(attributeName); + Object currentValue = getAttribute(attributeName); + if (currentValue == null) + { + if (newValue != null) + { + throw new IllegalConfigurationException("Cannot change value of immutable attribute " + attributeName); + } + } + else + { + if (!currentValue.equals(newValue)) + { + throw new IllegalConfigurationException("Cannot change value of immutable attribute " + attributeName); + } + } + } + } } protected VirtualHost getVirtualHost() @@ -329,4 +416,9 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication _replicatedEnvironmentFacade = replicatedEnvironmentFacade; } + public Object getActualAttribute(String attributeName) + { + return super.getAttribute(attributeName); + } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index f7727f6759..8c56cb9988 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -28,6 +28,7 @@ import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT; import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT; import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS; import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS; +import static org.apache.qpid.server.model.ReplicationNode.*; import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH; import java.io.File; @@ -86,11 +87,16 @@ import com.sleepycat.je.utilint.PropUtil; public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener { public static final String GROUP_CHECK_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.group_check_interval"; + public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval"; private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class); private static final long DEFAULT_GROUP_CHECK_INTERVAL = 1000l; private static final long GROUP_CHECK_INTERVAL = Long.getLong(GROUP_CHECK_INTERVAL_PROPERTY_NAME, DEFAULT_GROUP_CHECK_INTERVAL); + private static final int DEFAULT_MASTER_TRANSFER_TIMEOUT = 1000 * 60; + + public static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT); + @SuppressWarnings("serial") private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() {{ @@ -134,33 +140,26 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; - private final String _prettyGroupNodeName; - private final String _groupName; - private final String _nodeName; - private final String _nodeHostPort; - private final String _helperHostPort; + private final LocalReplicationNode _replicationNode; private final Durability _durability; - private final boolean _designatedPrimary; - private final boolean _coalescingSync; + private final Boolean _coalescingSync; + private final String _prettyGroupNodeName; private final File _environmentDirectory; - private final Map<String, String> _environmentParameters; - private final Map<String, String> _replicationEnvironmentParameters; + private final ExecutorService _restartEnvironmentExecutor; private final ScheduledExecutorService _groupChangeExecutor; private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING); private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>(); private final ConcurrentMap<String, RemoteReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, RemoteReplicationNode>(); private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory; - private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>(); private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>(); + private volatile ReplicatedEnvironment _environment; private long _joinTime; - private String _lastKnownReplicationTransactionId; + private long _lastKnownReplicationTransactionId; - @SuppressWarnings("unchecked") - public ReplicatedEnvironmentFacade(org.apache.qpid.server.model.ReplicationNode replicationNode, - RemoteReplicationNodeFactory remoteReplicationNodeFactory) + public ReplicatedEnvironmentFacade(LocalReplicationNode replicationNode, RemoteReplicationNodeFactory remoteReplicationNodeFactory) { _environmentDirectory = new File((String)replicationNode.getAttribute(STORE_PATH)); if (!_environmentDirectory.exists()) @@ -172,16 +171,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - _groupName = (String)replicationNode.getAttribute(GROUP_NAME); - _nodeName = replicationNode.getName(); - _nodeHostPort = (String)replicationNode.getAttribute(HOST_PORT);; - _helperHostPort = (String)replicationNode.getAttribute(HELPER_HOST_PORT); - _durability = Durability.parse((String)replicationNode.getAttribute(DURABILITY)); - _designatedPrimary = (Boolean)replicationNode.getAttribute(DESIGNATED_PRIMARY); - _coalescingSync = (Boolean)replicationNode.getAttribute(COALESCING_SYNC); - _environmentParameters = (Map<String, String>)replicationNode.getAttribute(PARAMETERS); - _replicationEnvironmentParameters = (Map<String, String>)replicationNode.getAttribute(REPLICATION_PARAMETERS); - _prettyGroupNodeName = _groupName + ":" + _nodeName; + _replicationNode = replicationNode; + + _durability = Durability.parse((String)_replicationNode.getAttribute(DURABILITY)); + _coalescingSync = (Boolean)_replicationNode.getAttribute(COALESCING_SYNC); + _prettyGroupNodeName = (String)_replicationNode.getAttribute(GROUP_NAME) + ":" + _replicationNode.getName(); _restartEnvironmentExecutor = Executors.newFixedThreadPool(1, new DaemonThreadFactory("Environment-Starter:" + _prettyGroupNodeName)); _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); @@ -239,6 +233,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public AMQStoreException handleDatabaseException(String contextMessage, final DatabaseException dbe) { + //TODO: restart environment if dbe instanceof MasterReplicaTransitionException boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException); if (restart) { @@ -402,22 +397,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public String getGroupName() { - return _groupName; + return (String)_replicationNode.getAttribute(GROUP_NAME); } public String getNodeName() { - return _nodeName; + return _replicationNode.getName(); } public String getHostPort() { - return _nodeHostPort; + return (String)_replicationNode.getAttribute(HOST_PORT); } public String getHelperHostPort() { - return _helperHostPort; + return (String)_replicationNode.getAttribute(HELPER_HOST_PORT); } public String getDurability() @@ -432,8 +427,16 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public String getNodeState() { - ReplicatedEnvironment.State state = _environment.getState(); - return state.toString(); + try + { + ReplicatedEnvironment.State state = _environment.getState(); + return state.toString(); + } + catch (IllegalStateException ise) + { + // Environment must be being recreated + return ReplicatedEnvironment.State.UNKNOWN.name(); + } } public boolean isDesignatedPrimary() @@ -441,11 +444,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return _environment.getRepMutableConfig().getDesignatedPrimary(); } - public int getQuorumOverride() - { - return _environment.getRepMutableConfig().getElectableGroupSizeOverride(); - } - public List<Map<String, String>> getGroupMembers() { List<Map<String, String>> members = new ArrayList<Map<String, String>>(); @@ -520,28 +518,113 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - public int getPriority() + int getPriority() { ReplicationMutableConfig repConfig = _environment.getRepMutableConfig(); return repConfig.getNodePriority(); } - public int getElectableGroupSizeOverride() + public void setPriority(int priority) throws AMQStoreException + { + checkNotOpeningAndEnvironmentIsValid(); + + try + { + final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); + final ReplicationMutableConfig newConfig = oldConfig.setNodePriority(priority); + _environment.setRepMutableConfig(newConfig); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Node " + _prettyGroupNodeName + " priority has been changed to " + priority); + } + } + catch (DatabaseException e) + { + // TODO: I am not sure about the exception handing here + throw handleDatabaseException("Cannot set priority on " + _prettyGroupNodeName, e); + } + } + + private void checkNotOpeningAndEnvironmentIsValid() + { + if (_state.get() == State.OPENING) + { + throw new IllegalStateException("Environment facade is in opening state"); + } + + if (!_environment.isValid()) + { + throw new IllegalStateException("Environment is not valid"); + } + } + + int getElectableGroupSizeOverride() { ReplicationMutableConfig repConfig = _environment.getRepMutableConfig(); return repConfig.getElectableGroupSizeOverride(); } + public void setElectableGroupSizeOverride(int electableGroupOverride) throws AMQStoreException + { + checkNotOpeningAndEnvironmentIsValid(); + + try + { + final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); + final ReplicationMutableConfig newConfig = oldConfig.setElectableGroupSizeOverride(electableGroupOverride); + _environment.setRepMutableConfig(newConfig); + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Node " + _prettyGroupNodeName + " electable group size override has been changed to " + electableGroupOverride); + } + } + catch (DatabaseException e) + { + // TODO: I am not sure about the exception handing here + throw handleDatabaseException("Cannot set electable group size override on " + _prettyGroupNodeName, e); + } + } + + public long getJoinTime() { return _joinTime ; } - public String getLastKnownReplicationTransactionId() + public long getLastKnownReplicationTransactionId() { return _lastKnownReplicationTransactionId; } + public void transferMasterToSelfAsynchronously() throws AMQStoreException + { + checkNotOpeningAndEnvironmentIsValid(); + + _groupChangeExecutor.submit(new Runnable() + { + @Override + public void run() + { + try + { + ReplicationGroupAdmin admin = createReplicationGroupAdmin(); + String newMaster = admin.transferMaster(Collections.singleton(getNodeName()), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("The mastership has been transfered to " + newMaster); + } + } + catch (DatabaseException e) + { + LOGGER.warn("Exception on transfering the mastership to " + _prettyGroupNodeName + + " Master transfer timeout : " + MASTER_TRANSFER_TIMEOUT, e); + } + } + }); + } + public ReplicatedEnvironment getEnvironment() { return _environment; @@ -610,7 +693,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan final ReplicationConfig repConfig = _environment.getRepConfig(); helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort())); - return new ReplicationGroupAdmin(_groupName, helpers); + return new ReplicationGroupAdmin((String)_replicationNode.getAttribute(GROUP_NAME), helpers); } private void closeEnvironment() @@ -713,35 +796,49 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + @SuppressWarnings("unchecked") private ReplicatedEnvironment createEnvironment(boolean createEnvironmentInSeparateThread) { + String groupName = (String)_replicationNode.getActualAttribute(GROUP_NAME); + String helperHostPort = (String)_replicationNode.getActualAttribute(HELPER_HOST_PORT); + String hostPort = (String)_replicationNode.getActualAttribute(HOST_PORT); + Map<String, String> environmentParameters = (Map<String, String>)_replicationNode.getActualAttribute(PARAMETERS); + Map<String, String> replicationEnvironmentParameters = (Map<String, String>)_replicationNode.getActualAttribute(REPLICATION_PARAMETERS); + Boolean designatedPrimary = (Boolean)_replicationNode.getActualAttribute(DESIGNATED_PRIMARY); + Integer priority = (Integer)_replicationNode.getActualAttribute(PRIORITY); + Integer quorumOverride = (Integer)_replicationNode.getActualAttribute(QUORUM_OVERRIDE); + if (LOGGER.isInfoEnabled()) { LOGGER.info("Creating environment"); LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath()); - LOGGER.info("Group name " + _groupName); - LOGGER.info("Node name " + _nodeName); - LOGGER.info("Node host port " + _nodeHostPort); - LOGGER.info("Helper host port " + _helperHostPort); + LOGGER.info("Group name " + groupName); + LOGGER.info("Node name " + _replicationNode.getName()); + LOGGER.info("Node host port " + hostPort); + LOGGER.info("Helper host port " + helperHostPort); LOGGER.info("Durability " + _durability); LOGGER.info("Coalescing sync " + _coalescingSync); - LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary); + LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary); + LOGGER.info("Node priority " + priority); + LOGGER.info("Quorum override " + quorumOverride); } Map<String, String> replicationEnvironmentSettings = new HashMap<String, String>(REPCONFIG_DEFAULTS); - if (_replicationEnvironmentParameters != null && !_replicationEnvironmentParameters.isEmpty()) + if (replicationEnvironmentParameters != null && !replicationEnvironmentParameters.isEmpty()) { - replicationEnvironmentSettings.putAll(_replicationEnvironmentParameters); + replicationEnvironmentSettings.putAll(replicationEnvironmentParameters); } Map<String, String> environmentSettings = new HashMap<String, String>(EnvironmentFacade.ENVCONFIG_DEFAULTS); - if (_environmentParameters != null && !_environmentParameters.isEmpty()) + if (environmentParameters != null && !environmentParameters.isEmpty()) { - environmentSettings.putAll(_environmentParameters); + environmentSettings.putAll(environmentParameters); } - ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort); - replicationConfig.setHelperHosts(_helperHostPort); - replicationConfig.setDesignatedPrimary(_designatedPrimary); + ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _replicationNode.getName(), hostPort); + replicationConfig.setHelperHosts(helperHostPort); + replicationConfig.setDesignatedPrimary(designatedPrimary); + replicationConfig.setNodePriority(priority); + replicationConfig.setElectableGroupSizeOverride(quorumOverride); for (Map.Entry<String, String> configItem : replicationEnvironmentSettings.entrySet()) { @@ -853,9 +950,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public void run() { + String groupName = (String)_replicationNode.getAttribute(GROUP_NAME); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Checking for changes in the group " + _groupName); + LOGGER.debug("Checking for changes in the group " + groupName); } ReplicatedEnvironment env = _environment; @@ -876,7 +974,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + _groupName + "'"); + LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + groupName + "'"); } RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName()); @@ -901,7 +999,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan String replicationNodeName = replicationNodeEntry.getKey(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Remote replication node removed '" + replicationNodeName + "' from '" + _groupName + "'"); + LOGGER.debug("Remote replication node removed '" + replicationNodeName + "' from '" + groupName + "'"); } _remoteReplicationNodes.remove(replicationNodeName); if (replicationGroupListener != null) @@ -950,11 +1048,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } catch (ExecutionException e) { - LOGGER.warn("Cannot update node state for group " + _groupName, e.getCause()); + LOGGER.warn("Cannot update node state for group " + (String)_replicationNode.getAttribute(GROUP_NAME), e.getCause()); } catch (TimeoutException e) { - LOGGER.warn("Timeout whilst updating node state for group " + _groupName); + LOGGER.warn("Timeout whilst updating node state for group " + (String)_replicationNode.getAttribute(GROUP_NAME)); future.cancel(true); } } @@ -1024,4 +1122,5 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index 0ddd7134ac..26651ac64c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory; import com.sleepycat.je.Durability; import com.sleepycat.je.Durability.SyncPolicy; +//TODO: Should LocalReplicationNode implement EnvironmentFacadeFactory instead of having this class? public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory { @@ -49,6 +50,8 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact { throw new IllegalStateException("Cannot find local replication node among virtual host nodes"); } + LocalReplicationNode localReplicationNode = (LocalReplicationNode)localNode; + String durability = (String)localNode.getAttribute(ReplicationNode.DURABILITY); Boolean coalescingSync = (Boolean)localNode.getAttribute(ReplicationNode.COALESCING_SYNC); @@ -58,8 +61,8 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact + "! Please set highAvailability.coalescingSync to false in store configuration."); } - ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(localNode, new RemoteReplicationNodeFactoryImpl(virtualHost)); - ((LocalReplicationNode)localNode).setReplicatedEnvironmentFacade(facade); + ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(localReplicationNode, new RemoteReplicationNodeFactoryImpl(virtualHost)); + localReplicationNode.setReplicatedEnvironmentFacade(facade); return facade; } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java index 330abce5cf..bfe79c3dfa 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java @@ -21,11 +21,14 @@ package org.apache.qpid.server.store.berkeleydb.replication; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.File; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.UUID; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -34,6 +37,8 @@ import org.apache.qpid.server.model.ReplicationNode; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; +import com.sleepycat.je.rep.ReplicatedEnvironment; + public class LocalReplicationNodeTest extends QpidTestCase { @@ -41,6 +46,7 @@ public class LocalReplicationNodeTest extends QpidTestCase private UUID _id; private VirtualHost _virtualHost; private TaskExecutor _taskExecutor; + private ReplicatedEnvironmentFacade _facade; @Override public void setUp() throws Exception @@ -48,6 +54,7 @@ public class LocalReplicationNodeTest extends QpidTestCase super.setUp(); _taskExecutor = mock(TaskExecutor.class); _virtualHost = mock(VirtualHost.class); + _facade = mock(ReplicatedEnvironmentFacade.class); } @Override @@ -129,51 +136,153 @@ public class LocalReplicationNodeTest extends QpidTestCase assertNodeAttributes(attributes, node); } - public void testSetReplicatedEnvironmentFacade() + public void testGetValuesFromReplicatedEnvironmentFacade() { - long joinTime = System.currentTimeMillis(); - int priority = 1; - String masterState = "MASTER"; - int quorumOverride = 2; - int port = 9999; - String hostPort = "localhost:" + port; - String groupName = getTestName(); - String storePath = TMP_FOLDER + File.separator + groupName; - boolean designatedPrimary = true; - String nodeName = "nodeName"; - - Map<String, Object> attributes = createValidAttributes(); - attributes.put(ReplicationNode.HOST_PORT, hostPort); - attributes.put(ReplicationNode.HELPER_HOST_PORT, hostPort); - attributes.put(ReplicationNode.STORE_PATH, storePath); - attributes.put(ReplicationNode.DESIGNATED_PRIMARY, designatedPrimary); - attributes.put(ReplicationNode.GROUP_NAME, groupName); - attributes.put(ReplicationNode.NAME, nodeName); - LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor); + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); assertNull("Unexpected role attribute", node.getAttribute(ReplicationNode.ROLE)); - assertNull("Unexpected quorum override attribute", node.getAttribute(ReplicationNode.QUORUM_OVERRIDE)); - assertNull("Unexpected priority attribute", node.getAttribute(ReplicationNode.PRIORITY)); assertNull("Unexpected join time attribute", node.getAttribute(ReplicationNode.JOIN_TIME)); + assertNull("Unexpected last transaction id", node.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)); + assertEquals("Unexpected priority attribute", LocalReplicationNode.DEFAULT_PRIORITY, node.getAttribute(ReplicationNode.PRIORITY)); + assertEquals("Unexpected quorum override attribute", LocalReplicationNode.DEFAULT_QUORUM_OVERRIDE, node.getAttribute(ReplicationNode.QUORUM_OVERRIDE)); + assertEquals("Unexpected designated primary attribute", LocalReplicationNode.DEFAULT_DESIGNATED_PRIMARY, node.getAttribute(ReplicationNode.DESIGNATED_PRIMARY)); + + String masterState = "MASTER"; + long joinTime = System.currentTimeMillis(); + long lastKnowTransactionId = 1000l; + boolean designatedPrimary = true; + int priority = 2; + int quorumOverride = 3; + + when(_facade.getNodeState()).thenReturn(masterState); + when(_facade.getJoinTime()).thenReturn(joinTime); + when(_facade.getLastKnownReplicationTransactionId()).thenReturn(lastKnowTransactionId); + when(_facade.isDesignatedPrimary()).thenReturn(designatedPrimary); + when(_facade.getPriority()).thenReturn(priority); + when(_facade.getElectableGroupSizeOverride()).thenReturn(quorumOverride); - ReplicatedEnvironmentFacade facade = mock(ReplicatedEnvironmentFacade.class); - when(facade.getNodeState()).thenReturn(masterState); - when(facade.getPriority()).thenReturn(priority); - when(facade.getJoinTime()).thenReturn(joinTime); - when(facade.getQuorumOverride()).thenReturn(quorumOverride); - when(facade.getGroupName()).thenReturn(groupName); - when(facade.getHelperHostPort()).thenReturn(hostPort); - when(facade.getHostPort()).thenReturn(hostPort); - when(facade.isDesignatedPrimary()).thenReturn(designatedPrimary); - when(facade.getNodeName()).thenReturn(nodeName); - - node.setReplicatedEnvironmentFacade(facade); + node.setReplicatedEnvironmentFacade(_facade); assertEquals("Unexpected role attribute", masterState, node.getAttribute(ReplicationNode.ROLE)); - assertEquals("Unexpected quorum override attribute", quorumOverride, node.getAttribute(ReplicationNode.QUORUM_OVERRIDE)); - assertEquals("Unexpected priority attribute", priority, node.getAttribute(ReplicationNode.PRIORITY)); assertEquals("Unexpected join time attribute", joinTime, node.getAttribute(ReplicationNode.JOIN_TIME)); + assertEquals("Unexpected last transaction id attribute", lastKnowTransactionId, node.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)); + assertEquals("Unexpected priority attribute", priority, node.getAttribute(ReplicationNode.PRIORITY)); + assertEquals("Unexpected quorum override attribute", quorumOverride, node.getAttribute(ReplicationNode.QUORUM_OVERRIDE)); + } - assertNodeAttributes(attributes, node); + public void testSetDesignatedPrimary() throws Exception + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); + node.setReplicatedEnvironmentFacade(_facade); + + node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.DESIGNATED_PRIMARY, true)); + + verify(_facade).setDesignatedPrimary(true); + + node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.DESIGNATED_PRIMARY, false)); + verify(_facade).setDesignatedPrimary(false); + } + + public void testSetPriority() throws Exception + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); + node.setReplicatedEnvironmentFacade(_facade); + node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.PRIORITY, 100)); + + verify(_facade).setPriority(100); + } + + public void testSetQuorumOverride() throws Exception + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); + node.setReplicatedEnvironmentFacade(_facade); + + node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.QUORUM_OVERRIDE, 10)); + + verify(_facade).setElectableGroupSizeOverride(10); + } + + public void testSetRole() throws Exception + { + when(_facade.getNodeState()).thenReturn(ReplicatedEnvironment.State.REPLICA.name()); + + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); + node.setReplicatedEnvironmentFacade(_facade); + + node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.MASTER.name())); + + verify(_facade).transferMasterToSelfAsynchronously(); + } + + public void testSetRoleToReplicaUnsupported() throws Exception + { + when(_facade.getNodeState()).thenReturn(ReplicatedEnvironment.State.REPLICA.name()); + + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); + node.setReplicatedEnvironmentFacade(_facade); + + try + { + node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.REPLICA.name())); + fail("Exception not thrown"); + } + catch(IllegalConfigurationException e) + { + // PASS + } + } + + public void testSetRoleWhenCurrentRoleNotRepliaIsUnsupported() throws Exception + { + when(_facade.getNodeState()).thenReturn(ReplicatedEnvironment.State.MASTER.name()); + + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); + node.setReplicatedEnvironmentFacade(_facade); + + try + { + node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.MASTER.name())); + fail("Exception not thrown"); + } + catch(IllegalConfigurationException e) + { + // PASS + } + } + + public void testSetImmutableAttributesThrowException() throws Exception + { + Map<String, Object> changeAttributeMap = new HashMap<String, Object>(); + changeAttributeMap.put(ReplicationNode.GROUP_NAME, "newGroupName"); + changeAttributeMap.put(ReplicationNode.HELPER_HOST_PORT, "newhost:1234"); + changeAttributeMap.put(ReplicationNode.HOST_PORT, "newhost:1234"); + changeAttributeMap.put(ReplicationNode.COALESCING_SYNC, Boolean.FALSE); + changeAttributeMap.put(ReplicationNode.DURABILITY, "durability"); + changeAttributeMap.put(ReplicationNode.JOIN_TIME, 1000l); + changeAttributeMap.put(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID, 10001l); + changeAttributeMap.put(ReplicationNode.NAME, "newName"); + changeAttributeMap.put(ReplicationNode.STORE_PATH, "/not/used"); + changeAttributeMap.put(ReplicationNode.PARAMETERS, Collections.emptyMap()); + changeAttributeMap.put(ReplicationNode.REPLICATION_PARAMETERS, Collections.emptyMap()); + + for (Entry<String, Object> entry : changeAttributeMap.entrySet()) + { + assertSetAttributesThrowsException(entry.getKey(), entry.getValue()); + } + } + + private void assertSetAttributesThrowsException(String attributeName, Object attributeValue) + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); + + try + { + node.setAttributes(Collections.<String, Object>singletonMap(attributeName, attributeValue)); + fail("Operation to change attribute '" + attributeName + "' should fail"); + } + catch(IllegalConfigurationException e) + { + // pass + } } private Map<String, Object> createValidAttributes() diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index cea7d52d43..1bce41403f 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -28,7 +28,7 @@ import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT; import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT; import static org.apache.qpid.server.model.ReplicationNode.NAME; import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS; -import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH; +import static org.apache.qpid.server.model.ReplicationNode.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -79,6 +79,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString(); private static final boolean TEST_DESIGNATED_PRIMARY = false; private static final boolean TEST_COALESCING_SYNC = true; + private static final int TEST_PRIORITY = 10; + private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0; private File _storePath; private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>(); @@ -122,7 +124,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase } public void testEnvironmentFacade() throws Exception { - EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster(); + EnvironmentFacade ef = createMaster(); assertNotNull("Environment should not be null", ef); Environment e = ef.getEnvironment(); assertTrue("Environment is not valid", e.isValid()); @@ -130,7 +132,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase public void testClose() throws Exception { - EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster(); + EnvironmentFacade ef = createMaster(); ef.close(); Environment e = ef.getEnvironment(); @@ -139,7 +141,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase public void testOpenDatabases() throws Exception { - EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster(); + EnvironmentFacade ef = createMaster(); DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setTransactional(true); dbConfig.setAllowCreate(true); @@ -153,7 +155,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase public void testGetOpenDatabaseForNonExistingDatabase() throws Exception { - EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster(); + EnvironmentFacade ef = createMaster(); DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setTransactional(true); dbConfig.setAllowCreate(true); @@ -173,50 +175,42 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase public void testGetGroupName() throws Exception { - assertEquals("Unexpected group name", TEST_GROUP_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getGroupName()); + assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName()); } public void testGetNodeName() throws Exception { - assertEquals("Unexpected group name", TEST_NODE_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getNodeName()); + assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName()); } public void testGetNodeHostPort() throws Exception { - assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, ((ReplicatedEnvironmentFacade) createMaster()).getHostPort()); + assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort()); } public void testGetHelperHostPort() throws Exception { - assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, ((ReplicatedEnvironmentFacade) createMaster()).getHelperHostPort()); + assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort()); } public void testGetDurability() throws Exception { - assertEquals("Unexpected durability", TEST_DURABILITY.toString(), ((ReplicatedEnvironmentFacade) createMaster()).getDurability()); + assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability()); } public void testIsCoalescingSync() throws Exception { - assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, ((ReplicatedEnvironmentFacade) createMaster()).isCoalescingSync()); + assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, createMaster().isCoalescingSync()); } public void testGetNodeState() throws Exception { - assertEquals("Unexpected state", State.MASTER.name(), ((ReplicatedEnvironmentFacade) createMaster()).getNodeState()); - } - - public void testIsDesignatedPrimary() throws Exception - { - ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster(); - assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); - master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY); - assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); + assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState()); } public void testGetGroupMembers() throws Exception { - List<Map<String, String>> groupMembers = ((ReplicatedEnvironmentFacade) createMaster()).getGroupMembers(); + List<Map<String, String>> groupMembers = createMaster().getGroupMembers(); Map<String, String> expectedMember = new HashMap<String, String>(); expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME); expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT); @@ -224,9 +218,35 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Unexpected group members", expectedGroupMembers, new HashSet<Map<String, String>>(groupMembers)); } + public void testPriority() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + assertEquals("Unexpected priority", TEST_PRIORITY, facade.getPriority()); + + facade.setPriority(TEST_PRIORITY + 1); + assertEquals("Unexpected priority after change", TEST_PRIORITY + 1, facade.getPriority()); + } + + public void testDesignatedPrimary() throws Exception + { + ReplicatedEnvironmentFacade master = createMaster(); + assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); + master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY); + assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); + } + + + public void testElectableGroupSizeOverride() throws Exception + { + ReplicatedEnvironmentFacade facade = createMaster(); + assertEquals("Unexpected Electable Group Size Override", TEST_ELECTABLE_GROUP_OVERRIDE, facade.getElectableGroupSizeOverride()); + facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1); + assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride()); + } + public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception { - ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster(); + ReplicatedEnvironmentFacade master = createMaster(); String nodeName2 = TEST_NODE_NAME + "_2"; String host = "localhost"; int port = getNextAvailable(TEST_NODE_PORT + 1); @@ -383,7 +403,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase public void testRemoveNodeFromGroup() throws Exception { - ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) createMaster(); + ReplicatedEnvironmentFacade environmentFacade = createMaster(); String node2Name = TEST_NODE_NAME + "_2"; String node2NodeHostPort = "localhost:" + getNextAvailable(TEST_NODE_PORT + 1); @@ -398,26 +418,9 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Unexpected group members count", 1, groupMembers.size()); } - public void testSetDesignatedPrimary() throws Exception - { - ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) createMaster(); - environmentFacade.setDesignatedPrimary(false); - assertFalse("Unexpected designated primary", environmentFacade.isDesignatedPrimary()); - } - - public void testGetNodePriority() throws Exception - { - assertEquals("Unexpected node priority", 1, ((ReplicatedEnvironmentFacade) createMaster()).getPriority()); - } - - public void testGetElectableGroupSizeOverride() throws Exception - { - assertEquals("Unexpected Electable Group Size Override", 0, ((ReplicatedEnvironmentFacade) createMaster()).getElectableGroupSizeOverride()); - } - public void testEnvironmentRestartOnInsufficientReplicas() throws Exception { - ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster(); + ReplicatedEnvironmentFacade master = createMaster(); int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); String replica1NodeName = TEST_NODE_NAME + "_1"; @@ -545,11 +548,12 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertTrue("Replica " + nodeName + " was not started", testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS)); return replicaEnvironmentFacade; } + private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary, State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener) { - ReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary); + LocalReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary); ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(node, _remoteReplicationNodeFactory); ref.setReplicationGroupListener(replicationGroupListener); ref.setStateChangeListener(stateChangeListener); @@ -571,18 +575,31 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase return dbConfig; } - private ReplicationNode createReplicationNodeMock(String nodeName, String nodeHostPort, boolean designatedPrimary) + private LocalReplicationNode createReplicationNodeMock(String nodeName, String nodeHostPort, boolean designatedPrimary) { - ReplicationNode node = mock(ReplicationNode.class); + LocalReplicationNode node = mock(LocalReplicationNode.class); when(node.getAttribute(NAME)).thenReturn(nodeName); when(node.getName()).thenReturn(nodeName); when(node.getAttribute(HOST_PORT)).thenReturn(nodeHostPort); when(node.getAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary); + when(node.getAttribute(QUORUM_OVERRIDE)).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE); + when(node.getAttribute(PRIORITY)).thenReturn(TEST_PRIORITY); when(node.getAttribute(GROUP_NAME)).thenReturn(TEST_GROUP_NAME); when(node.getAttribute(HELPER_HOST_PORT)).thenReturn(TEST_NODE_HELPER_HOST_PORT); when(node.getAttribute(DURABILITY)).thenReturn(TEST_DURABILITY); when(node.getAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC); + + // TMP REF contract with LRN is too complicated. + when(node.getActualAttribute(HOST_PORT)).thenReturn(nodeHostPort); + when(node.getActualAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary); + when(node.getActualAttribute(QUORUM_OVERRIDE)).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE); + when(node.getActualAttribute(PRIORITY)).thenReturn(TEST_PRIORITY); + when(node.getActualAttribute(GROUP_NAME)).thenReturn(TEST_GROUP_NAME); + when(node.getActualAttribute(HELPER_HOST_PORT)).thenReturn(TEST_NODE_HELPER_HOST_PORT); + when(node.getActualAttribute(DURABILITY)).thenReturn(TEST_DURABILITY); + when(node.getActualAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC); + Map<String, String> repConfig = new HashMap<String, String>(); repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s"); repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java new file mode 100644 index 0000000000..b70703c526 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.server.model.ReplicationNode; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.systest.rest.QpidRestTestCase; +import org.apache.qpid.util.FileUtils; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; + +public class ReplicationNodeRestTest extends QpidRestTestCase +{ + private static final String NODE_NAME = "node1"; + private static final String GROUP_NAME = "replication-group"; + + private String _hostName; + private File _storeFile; + private int _haPort; + private String _nodeRestUrl; + private String _hostRestUrl; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _hostName = getTestName(); + _nodeRestUrl = "/rest/replicationnode/" + _hostName + "/" + NODE_NAME; + _hostRestUrl = "/rest/virtualhost/" + _hostName; + + _storeFile = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis()); + _haPort = findFreePort(); + + Map<String, Object> hostData = new HashMap<String, Object>(); + hostData.put(VirtualHost.NAME, _hostName); + hostData.put(VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE); + hostData.put(VirtualHost.DESIRED_STATE, State.QUIESCED); + + int responseCode = getRestTestHelper().submitRequest(_hostRestUrl, "PUT", hostData); + assertEquals("Unexpected response code for virtual host creation request", 201, responseCode); + + String hostPort = "localhost:" + _haPort; + Map<String, Object> nodeData = new HashMap<String, Object>(); + nodeData.put(ReplicationNode.NAME, NODE_NAME); + nodeData.put(ReplicationNode.GROUP_NAME, GROUP_NAME); + nodeData.put(ReplicationNode.HOST_PORT, hostPort); + nodeData.put(ReplicationNode.HELPER_HOST_PORT, hostPort); + nodeData.put(ReplicationNode.STORE_PATH, _storeFile.getAbsolutePath()); + + responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", nodeData); + assertEquals("Unexpected response code for node creation request", 201, responseCode); + + } + + @Override + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + if (_storeFile != null) + { + FileUtils.delete(_storeFile, true); + } + } + } + + public void testChangePriority() throws Exception + { + assertReplicationNodeSetAttribute(ReplicationNode.PRIORITY, 1, 2, 3); + } + + public void testChangeQuorumOverride() throws Exception + { + assertReplicationNodeSetAttribute(ReplicationNode.QUORUM_OVERRIDE, 0, 1, 2); + } + + public void testChangeDesignatedPrimary() throws Exception + { + assertReplicationNodeSetAttribute(ReplicationNode.DESIGNATED_PRIMARY, false, true, false); + } + + public void testCreationOfSecondLocalReplicationNodeFails() throws Exception + { + String hostPort = "localhost:" + _haPort; + Map<String, Object> nodeData = new HashMap<String, Object>(); + nodeData.put(ReplicationNode.NAME, NODE_NAME + 1); + nodeData.put(ReplicationNode.GROUP_NAME, GROUP_NAME); + nodeData.put(ReplicationNode.HOST_PORT, hostPort); + nodeData.put(ReplicationNode.HELPER_HOST_PORT, hostPort); + nodeData.put(ReplicationNode.STORE_PATH, _storeFile.getAbsolutePath()); + + int responseCode = getRestTestHelper().submitRequest(_nodeRestUrl + 1, "PUT", nodeData); + assertEquals("Adding of a second replication node should fail", 409, responseCode); + } + + public void testUpdateImmutableAttributeWithTheSameValueSucceeds() throws Exception + { + String hostPort = "localhost:" + _haPort; + Map<String, Object> nodeData = new HashMap<String, Object>(); + nodeData.put(ReplicationNode.NAME, NODE_NAME); + nodeData.put(ReplicationNode.GROUP_NAME, GROUP_NAME); + nodeData.put(ReplicationNode.HOST_PORT, hostPort); + nodeData.put(ReplicationNode.HELPER_HOST_PORT, hostPort); + nodeData.put(ReplicationNode.STORE_PATH, _storeFile.getAbsolutePath()); + + int responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", nodeData); + assertEquals("Update with unchanged attribute should succeed", 200, responseCode); + } + + private void assertReplicationNodeSetAttribute(String attributeName, Object initialValue, + Object newValueBeforeHostActivation, Object newValueAfterHostActivation) throws IOException, JsonGenerationException, + JsonMappingException + { + Map<String, Object> nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl); + assertEquals("Unexpected " + attributeName + " after creation", initialValue, nodeAttributes.get(attributeName)); + + int responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", Collections.<String, Object>singletonMap(attributeName, newValueBeforeHostActivation)); + assertEquals("Unexpected response code for node " + attributeName + " update", 200, responseCode); + + nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl); + assertEquals("Unexpected " + attributeName + " after update but before host activation", newValueBeforeHostActivation, nodeAttributes.get(attributeName)); + + responseCode = getRestTestHelper().submitRequest(_hostRestUrl, "PUT", Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE)); + assertEquals("Unexpected response code for virtual host update status", 200, responseCode); + + nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl); + assertEquals("Unexpected " + attributeName + " after host activation", newValueBeforeHostActivation, nodeAttributes.get(attributeName)); + + responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", Collections.<String, Object>singletonMap(attributeName, newValueAfterHostActivation)); + assertEquals("Unexpected response code for node " + attributeName + " update", 200, responseCode); + + nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl); + assertEquals("Unexpected " + attributeName + " after update after host activation", newValueAfterHostActivation, nodeAttributes.get(attributeName)); + } +} diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/VirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/VirtualHostRestTest.java index 9811891886..b4a3661039 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/VirtualHostRestTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/VirtualHostRestTest.java @@ -36,21 +36,49 @@ public class VirtualHostRestTest extends QpidRestTestCase { private static final String VIRTUALHOST_NODES_ATTRIBUTE = "replicationnodes"; + private File _storeFile; + private String _hostName; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _hostName = getTestName(); + + _storeFile = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis()); + } + + @Override + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + if (_storeFile != null) + { + FileUtils.delete(_storeFile, true); + } + } + } public void testPutCreateHAVirtualHost() throws Exception { Map<String, Object> hostData = new HashMap<String, Object>(); - String hostName = getTestName(); - hostData.put(VirtualHost.NAME, hostName); + hostData.put(VirtualHost.NAME, _hostName); hostData.put(VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE); hostData.put(VirtualHost.DESIRED_STATE, State.QUIESCED); - int responseCode = getRestTestHelper().submitRequest("/rest/virtualhost/" + hostName, "PUT", hostData); + int responseCode = getRestTestHelper().submitRequest("/rest/virtualhost/" + _hostName, "PUT", hostData); assertEquals("Unexpected response code for virtual host creation request", 201, responseCode); - // TODO should observe vh state + Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + _hostName); + assertEquals("Virtual host in unexpected desired state ", State.QUIESCED.name(), hostDetails.get(VirtualHost.DESIRED_STATE)); + assertEquals("Virtual host in unexpected actual state ", State.QUIESCED.name(), hostDetails.get(VirtualHost.STATE)); - String storeLocation = new File(TMP_FOLDER, "store-" + hostName + "-" + System.currentTimeMillis()).getAbsolutePath(); + String storeLocation = _storeFile.getAbsolutePath(); String nodeName = "node1"; String groupName = "replication-group"; int port = findFreePort(); @@ -63,44 +91,35 @@ public class VirtualHostRestTest extends QpidRestTestCase nodeData.put(ReplicationNode.HELPER_HOST_PORT, hostPort); nodeData.put(ReplicationNode.STORE_PATH, storeLocation); - String createNodeUrl = "/rest/replicationnode/" + hostName + "/" + nodeName; + String createNodeUrl = "/rest/replicationnode/" + _hostName + "/" + nodeName; responseCode = getRestTestHelper().submitRequest(createNodeUrl, "PUT", nodeData); assertEquals("Unexpected response code for node creation request", 201, responseCode); hostData.clear(); hostData.put(VirtualHost.DESIRED_STATE, State.ACTIVE); - responseCode = getRestTestHelper().submitRequest("/rest/virtualhost/" + hostName, "PUT", hostData); + responseCode = getRestTestHelper().submitRequest("/rest/virtualhost/" + _hostName, "PUT", hostData); assertEquals("Unexpected response code for virtual host update status", 200, responseCode); - waitForVirtualHostActivation(hostName, 10000l); + waitForVirtualHostActivation(_hostName, 10000l); - Map<String, Object> replicationNodeDetails = getRestTestHelper().getJsonAsSingletonList("/rest/replicationnode/" + hostName + "/" + nodeName); + Map<String, Object> replicationNodeDetails = getRestTestHelper().getJsonAsSingletonList("/rest/replicationnode/" + _hostName + "/" + nodeName); assertLocalNode(nodeData, replicationNodeDetails); - try - { - // make sure that the host is saved in the broker store - restartBroker(); - Map<String, Object> hostDetails = waitForVirtualHostActivation(hostName, 10000l); - Asserts.assertVirtualHost(hostName, hostDetails); - assertEquals("Unexpected virtual host type", BDBHAVirtualHostFactory.TYPE.toString(), hostDetails.get(VirtualHost.TYPE)); + // make sure that the host is saved in the broker store + restartBroker(); - @SuppressWarnings("unchecked") - List<Map<String, Object>> nodes = (List<Map<String, Object>>) hostDetails.get(VIRTUALHOST_NODES_ATTRIBUTE); - assertEquals("Unexpected number of nodes", 1, nodes.size()); - assertLocalNode(nodeData, nodes.get(0)); + hostDetails = waitForVirtualHostActivation(_hostName, 10000l); + Asserts.assertVirtualHost(_hostName, hostDetails); + assertEquals("Unexpected virtual host type", BDBHAVirtualHostFactory.TYPE.toString(), hostDetails.get(VirtualHost.TYPE)); - // verify that that node rest interface returns the same node attributes - replicationNodeDetails = getRestTestHelper().getJsonAsSingletonList("/rest/replicationnode/" + hostName + "/" + nodeName); - assertLocalNode(nodeData, replicationNodeDetails); - } - finally - { - if (storeLocation != null) - { - FileUtils.delete(new File(storeLocation), true); - } - } + @SuppressWarnings("unchecked") + List<Map<String, Object>> nodes = (List<Map<String, Object>>) hostDetails.get(VIRTUALHOST_NODES_ATTRIBUTE); + assertEquals("Unexpected number of nodes", 1, nodes.size()); + assertLocalNode(nodeData, nodes.get(0)); + + // verify that that node rest interface returns the same node attributes + replicationNodeDetails = getRestTestHelper().getJsonAsSingletonList("/rest/replicationnode/" + _hostName + "/" + nodeName); + assertLocalNode(nodeData, replicationNodeDetails); } private void assertLocalNode(Map<String, Object> expectedNodeAttributes, Map<String, Object> actualNodesAttributes) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java index 1d064347ad..71002d9f72 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java @@ -54,7 +54,7 @@ public interface ReplicationNode extends ConfiguredObject /** A designated primary setting for 2-nodes group*/ String DESIGNATED_PRIMARY = "designatedPrimary"; - /** Node priority*/ + /** Node priority. 1 signifies normal priority; 0 signifies node will never be elected. */ String PRIORITY = "priority"; /** The overridden minimum number of group nodes required to commit transaction on this node instead of simple majority*/ |