summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-01-31 13:21:51 +0000
committerKeith Wall <kwall@apache.org>2014-01-31 13:21:51 +0000
commit622266139f1914769605b7a77cdc48b3e5e82e07 (patch)
tree9687e3101a95f9633e9c9d5c2fd841f694dff660
parentf5bb2323007e5071e42b1de684317af388936dd4 (diff)
downloadqpid-python-622266139f1914769605b7a77cdc48b3e5e82e07.tar.gz
QPID-5409: Setting attributes on LocalReplicationNode now causes the corresponding update to the facade.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1563131 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java176
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java211
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java7
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java183
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java105
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java164
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/VirtualHostRestTest.java81
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java2
8 files changed, 716 insertions, 213 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
index 556bcc54c3..042144efe4 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
@@ -43,19 +43,25 @@ import org.apache.qpid.server.util.ParameterizedTypeImpl;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Durability.ReplicaAckPolicy;
import com.sleepycat.je.Durability.SyncPolicy;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
public class LocalReplicationNode extends AbstractAdapter implements ReplicationNode
{
private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
ReplicaAckPolicy.SIMPLE_MAJORITY);
+ static final boolean DEFAULT_DESIGNATED_PRIMARY = false;
+ static final int DEFAULT_PRIORITY = 1;
+ static final int DEFAULT_QUORUM_OVERRIDE = 0;
@SuppressWarnings("serial")
static final Map<String, Object> DEFAULTS = new HashMap<String, Object>()
{{
put(DURABILITY, DEFAULT_DURABILITY.toString());
put(COALESCING_SYNC, true);
- put(DESIGNATED_PRIMARY, false);
+ put(DESIGNATED_PRIMARY, DEFAULT_DESIGNATED_PRIMARY);
+ put(PRIORITY, DEFAULT_PRIORITY);
+ put(QUORUM_OVERRIDE, DEFAULT_QUORUM_OVERRIDE);
//TODO: add defaults for parameters and replicatedParameters
}};
@@ -77,10 +83,16 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
put(PARAMETERS, new ParameterizedTypeImpl(Map.class, String.class, String.class));
put(REPLICATION_PARAMETERS, new ParameterizedTypeImpl(Map.class, String.class, String.class));
put(STORE_PATH, String.class);
+ put(LAST_KNOWN_REPLICATION_TRANSACTION_ID, Long.class);
}};
+ static final String[] IMMUTABLE_ATTRIBUTES = {ReplicationNode.GROUP_NAME, ReplicationNode.HELPER_HOST_PORT,
+ ReplicationNode.HOST_PORT, ReplicationNode.COALESCING_SYNC, ReplicationNode.DURABILITY,
+ ReplicationNode.JOIN_TIME, ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID, ReplicationNode.NAME,
+ ReplicationNode.STORE_PATH, ReplicationNode.PARAMETERS, ReplicationNode.REPLICATION_PARAMETERS};
+
private final VirtualHost _virtualHost;
- private ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
+ private volatile ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
//TODO: add state management
public LocalReplicationNode(UUID id, Map<String, Object> attributes, VirtualHost virtualHost, TaskExecutor taskExecutor)
@@ -131,13 +143,6 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
}
@Override
- public State getDesiredState()
- {
- // TODO
- return getActualState();
- }
-
- @Override
public State getActualState()
{
// TODO
@@ -220,49 +225,25 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
{
return _replicatedEnvironmentFacade.getNodeState();
}
- else if(DURABILITY.equals(attributeName))
- {
- return _replicatedEnvironmentFacade.getDurability();
- }
- else if(PRIORITY.equals(attributeName))
- {
- return _replicatedEnvironmentFacade.getPriority();
- }
- else if(GROUP_NAME.equals(attributeName))
- {
- return _replicatedEnvironmentFacade.getGroupName();
- }
- else if(NAME.equals(attributeName))
- {
- return _replicatedEnvironmentFacade.getNodeName();
- }
- else if(HOST_PORT.equals(attributeName))
+ else if(JOIN_TIME.equals(attributeName))
{
- return _replicatedEnvironmentFacade.getHostPort();
+ return _replicatedEnvironmentFacade.getJoinTime();
}
- else if(HELPER_HOST_PORT.equals(attributeName))
+ else if(LAST_KNOWN_REPLICATION_TRANSACTION_ID.equals(attributeName))
{
- return _replicatedEnvironmentFacade.getHelperHostPort();
+ return _replicatedEnvironmentFacade.getLastKnownReplicationTransactionId();
}
- else if(COALESCING_SYNC.equals(attributeName))
+ else if(QUORUM_OVERRIDE.equals(attributeName))
{
- return _replicatedEnvironmentFacade.isCoalescingSync();
+ return _replicatedEnvironmentFacade.getElectableGroupSizeOverride();
}
else if(DESIGNATED_PRIMARY.equals(attributeName))
{
return _replicatedEnvironmentFacade.isDesignatedPrimary();
}
- else if(QUORUM_OVERRIDE.equals(attributeName))
- {
- return _replicatedEnvironmentFacade.getQuorumOverride();
- }
- else if(JOIN_TIME.equals(attributeName))
- {
- return _replicatedEnvironmentFacade.getJoinTime();
- }
- else if(LAST_KNOWN_REPLICATION_TRANSACTION_ID.equals(attributeName))
+ else if(PRIORITY.equals(attributeName))
{
- return _replicatedEnvironmentFacade.getLastKnownReplicationTransactionId();
+ return _replicatedEnvironmentFacade.getPriority();
}
}
return super.getAttribute(attributeName);
@@ -300,7 +281,113 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
throws IllegalStateException, AccessControlException,
IllegalArgumentException
{
- throw new UnsupportedOperationException();
+ Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES);
+
+ checkWhetherImmutableAttributeChanged(convertedAttributes);
+
+ updateReplicatedEnvironmentFacade(convertedAttributes);
+
+ super.changeAttributes(convertedAttributes);
+ }
+
+ private void updateReplicatedEnvironmentFacade(Map<String, Object> convertedAttributes)
+ {
+ if (_replicatedEnvironmentFacade != null)
+ {
+ if (convertedAttributes.get(PRIORITY) != null)
+ {
+ int priority = (Integer)convertedAttributes.get(PRIORITY);
+ try
+ {
+ _replicatedEnvironmentFacade.setPriority(priority);
+ }
+ catch(Exception e)
+ {
+ throw new IllegalConfigurationException("Cannot set attribute " + PRIORITY + " to " + priority, e);
+ }
+ }
+
+ if (convertedAttributes.get(DESIGNATED_PRIMARY) != null)
+ {
+ boolean designatedPrimary = (Boolean)convertedAttributes.get(DESIGNATED_PRIMARY);
+ try
+ {
+ _replicatedEnvironmentFacade.setDesignatedPrimary(designatedPrimary);
+ }
+ catch(Exception e)
+ {
+ throw new IllegalConfigurationException("Cannot set attribute '" + DESIGNATED_PRIMARY + "' to " + designatedPrimary, e);
+ }
+ }
+
+ if (convertedAttributes.get(QUORUM_OVERRIDE) != null)
+ {
+ int quorumOverride = (Integer)convertedAttributes.get(QUORUM_OVERRIDE);
+ try
+ {
+ _replicatedEnvironmentFacade.setElectableGroupSizeOverride(quorumOverride);
+ }
+ catch(Exception e)
+ {
+ throw new IllegalConfigurationException("Cannot set attribute '" + QUORUM_OVERRIDE + "' to " + quorumOverride, e);
+ }
+ }
+ }
+
+ if (convertedAttributes.containsKey(ROLE))
+ {
+ String currentRole = (String)getAttribute(ROLE);
+ if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole))
+ {
+ throw new IllegalConfigurationException("Cannot transfer mastership when not a replica");
+ }
+
+ // we do not want to write role into the store
+ String role = (String)convertedAttributes.remove(ROLE);
+
+ if (ReplicatedEnvironment.State.MASTER.name().equals(role) )
+ {
+ try
+ {
+ _replicatedEnvironmentFacade.transferMasterToSelfAsynchronously();
+ }
+ catch(Exception e)
+ {
+ throw new IllegalConfigurationException("Cannot transfer mastership", e);
+ }
+ }
+ else
+ {
+ throw new IllegalConfigurationException("Changing role to other value then " + ReplicatedEnvironment.State.MASTER.name() + " is unsupported");
+ }
+ }
+ }
+
+ private void checkWhetherImmutableAttributeChanged(Map<String, Object> convertedAttributes)
+ {
+ for (int i = 0; i < IMMUTABLE_ATTRIBUTES.length; i++)
+ {
+ String attributeName = IMMUTABLE_ATTRIBUTES[i];
+ if (convertedAttributes.containsKey(attributeName))
+ {
+ Object newValue = convertedAttributes.get(attributeName);
+ Object currentValue = getAttribute(attributeName);
+ if (currentValue == null)
+ {
+ if (newValue != null)
+ {
+ throw new IllegalConfigurationException("Cannot change value of immutable attribute " + attributeName);
+ }
+ }
+ else
+ {
+ if (!currentValue.equals(newValue))
+ {
+ throw new IllegalConfigurationException("Cannot change value of immutable attribute " + attributeName);
+ }
+ }
+ }
+ }
}
protected VirtualHost getVirtualHost()
@@ -329,4 +416,9 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
_replicatedEnvironmentFacade = replicatedEnvironmentFacade;
}
+ public Object getActualAttribute(String attributeName)
+ {
+ return super.getAttribute(attributeName);
+ }
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index f7727f6759..8c56cb9988 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -28,6 +28,7 @@ import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS;
import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
+import static org.apache.qpid.server.model.ReplicationNode.*;
import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH;
import java.io.File;
@@ -86,11 +87,16 @@ import com.sleepycat.je.utilint.PropUtil;
public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
{
public static final String GROUP_CHECK_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.group_check_interval";
+ public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval";
private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class);
private static final long DEFAULT_GROUP_CHECK_INTERVAL = 1000l;
private static final long GROUP_CHECK_INTERVAL = Long.getLong(GROUP_CHECK_INTERVAL_PROPERTY_NAME, DEFAULT_GROUP_CHECK_INTERVAL);
+ private static final int DEFAULT_MASTER_TRANSFER_TIMEOUT = 1000 * 60;
+
+ public static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT);
+
@SuppressWarnings("serial")
private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
{{
@@ -134,33 +140,26 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
- private final String _prettyGroupNodeName;
- private final String _groupName;
- private final String _nodeName;
- private final String _nodeHostPort;
- private final String _helperHostPort;
+ private final LocalReplicationNode _replicationNode;
private final Durability _durability;
- private final boolean _designatedPrimary;
- private final boolean _coalescingSync;
+ private final Boolean _coalescingSync;
+ private final String _prettyGroupNodeName;
private final File _environmentDirectory;
- private final Map<String, String> _environmentParameters;
- private final Map<String, String> _replicationEnvironmentParameters;
+
private final ExecutorService _restartEnvironmentExecutor;
private final ScheduledExecutorService _groupChangeExecutor;
private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING);
private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>();
private final ConcurrentMap<String, RemoteReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, RemoteReplicationNode>();
private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory;
-
private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
+
private volatile ReplicatedEnvironment _environment;
private long _joinTime;
- private String _lastKnownReplicationTransactionId;
+ private long _lastKnownReplicationTransactionId;
- @SuppressWarnings("unchecked")
- public ReplicatedEnvironmentFacade(org.apache.qpid.server.model.ReplicationNode replicationNode,
- RemoteReplicationNodeFactory remoteReplicationNodeFactory)
+ public ReplicatedEnvironmentFacade(LocalReplicationNode replicationNode, RemoteReplicationNodeFactory remoteReplicationNodeFactory)
{
_environmentDirectory = new File((String)replicationNode.getAttribute(STORE_PATH));
if (!_environmentDirectory.exists())
@@ -172,16 +171,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- _groupName = (String)replicationNode.getAttribute(GROUP_NAME);
- _nodeName = replicationNode.getName();
- _nodeHostPort = (String)replicationNode.getAttribute(HOST_PORT);;
- _helperHostPort = (String)replicationNode.getAttribute(HELPER_HOST_PORT);
- _durability = Durability.parse((String)replicationNode.getAttribute(DURABILITY));
- _designatedPrimary = (Boolean)replicationNode.getAttribute(DESIGNATED_PRIMARY);
- _coalescingSync = (Boolean)replicationNode.getAttribute(COALESCING_SYNC);
- _environmentParameters = (Map<String, String>)replicationNode.getAttribute(PARAMETERS);
- _replicationEnvironmentParameters = (Map<String, String>)replicationNode.getAttribute(REPLICATION_PARAMETERS);
- _prettyGroupNodeName = _groupName + ":" + _nodeName;
+ _replicationNode = replicationNode;
+
+ _durability = Durability.parse((String)_replicationNode.getAttribute(DURABILITY));
+ _coalescingSync = (Boolean)_replicationNode.getAttribute(COALESCING_SYNC);
+ _prettyGroupNodeName = (String)_replicationNode.getAttribute(GROUP_NAME) + ":" + _replicationNode.getName();
_restartEnvironmentExecutor = Executors.newFixedThreadPool(1, new DaemonThreadFactory("Environment-Starter:" + _prettyGroupNodeName));
_groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
@@ -239,6 +233,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public AMQStoreException handleDatabaseException(String contextMessage, final DatabaseException dbe)
{
+ //TODO: restart environment if dbe instanceof MasterReplicaTransitionException
boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException);
if (restart)
{
@@ -402,22 +397,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public String getGroupName()
{
- return _groupName;
+ return (String)_replicationNode.getAttribute(GROUP_NAME);
}
public String getNodeName()
{
- return _nodeName;
+ return _replicationNode.getName();
}
public String getHostPort()
{
- return _nodeHostPort;
+ return (String)_replicationNode.getAttribute(HOST_PORT);
}
public String getHelperHostPort()
{
- return _helperHostPort;
+ return (String)_replicationNode.getAttribute(HELPER_HOST_PORT);
}
public String getDurability()
@@ -432,8 +427,16 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public String getNodeState()
{
- ReplicatedEnvironment.State state = _environment.getState();
- return state.toString();
+ try
+ {
+ ReplicatedEnvironment.State state = _environment.getState();
+ return state.toString();
+ }
+ catch (IllegalStateException ise)
+ {
+ // Environment must be being recreated
+ return ReplicatedEnvironment.State.UNKNOWN.name();
+ }
}
public boolean isDesignatedPrimary()
@@ -441,11 +444,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return _environment.getRepMutableConfig().getDesignatedPrimary();
}
- public int getQuorumOverride()
- {
- return _environment.getRepMutableConfig().getElectableGroupSizeOverride();
- }
-
public List<Map<String, String>> getGroupMembers()
{
List<Map<String, String>> members = new ArrayList<Map<String, String>>();
@@ -520,28 +518,113 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- public int getPriority()
+ int getPriority()
{
ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
return repConfig.getNodePriority();
}
- public int getElectableGroupSizeOverride()
+ public void setPriority(int priority) throws AMQStoreException
+ {
+ checkNotOpeningAndEnvironmentIsValid();
+
+ try
+ {
+ final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+ final ReplicationMutableConfig newConfig = oldConfig.setNodePriority(priority);
+ _environment.setRepMutableConfig(newConfig);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Node " + _prettyGroupNodeName + " priority has been changed to " + priority);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ // TODO: I am not sure about the exception handing here
+ throw handleDatabaseException("Cannot set priority on " + _prettyGroupNodeName, e);
+ }
+ }
+
+ private void checkNotOpeningAndEnvironmentIsValid()
+ {
+ if (_state.get() == State.OPENING)
+ {
+ throw new IllegalStateException("Environment facade is in opening state");
+ }
+
+ if (!_environment.isValid())
+ {
+ throw new IllegalStateException("Environment is not valid");
+ }
+ }
+
+ int getElectableGroupSizeOverride()
{
ReplicationMutableConfig repConfig = _environment.getRepMutableConfig();
return repConfig.getElectableGroupSizeOverride();
}
+ public void setElectableGroupSizeOverride(int electableGroupOverride) throws AMQStoreException
+ {
+ checkNotOpeningAndEnvironmentIsValid();
+
+ try
+ {
+ final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
+ final ReplicationMutableConfig newConfig = oldConfig.setElectableGroupSizeOverride(electableGroupOverride);
+ _environment.setRepMutableConfig(newConfig);
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Node " + _prettyGroupNodeName + " electable group size override has been changed to " + electableGroupOverride);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ // TODO: I am not sure about the exception handing here
+ throw handleDatabaseException("Cannot set electable group size override on " + _prettyGroupNodeName, e);
+ }
+ }
+
+
public long getJoinTime()
{
return _joinTime ;
}
- public String getLastKnownReplicationTransactionId()
+ public long getLastKnownReplicationTransactionId()
{
return _lastKnownReplicationTransactionId;
}
+ public void transferMasterToSelfAsynchronously() throws AMQStoreException
+ {
+ checkNotOpeningAndEnvironmentIsValid();
+
+ _groupChangeExecutor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ ReplicationGroupAdmin admin = createReplicationGroupAdmin();
+ String newMaster = admin.transferMaster(Collections.singleton(getNodeName()), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("The mastership has been transfered to " + newMaster);
+ }
+ }
+ catch (DatabaseException e)
+ {
+ LOGGER.warn("Exception on transfering the mastership to " + _prettyGroupNodeName
+ + " Master transfer timeout : " + MASTER_TRANSFER_TIMEOUT, e);
+ }
+ }
+ });
+ }
+
public ReplicatedEnvironment getEnvironment()
{
return _environment;
@@ -610,7 +693,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
final ReplicationConfig repConfig = _environment.getRepConfig();
helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
- return new ReplicationGroupAdmin(_groupName, helpers);
+ return new ReplicationGroupAdmin((String)_replicationNode.getAttribute(GROUP_NAME), helpers);
}
private void closeEnvironment()
@@ -713,35 +796,49 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ @SuppressWarnings("unchecked")
private ReplicatedEnvironment createEnvironment(boolean createEnvironmentInSeparateThread)
{
+ String groupName = (String)_replicationNode.getActualAttribute(GROUP_NAME);
+ String helperHostPort = (String)_replicationNode.getActualAttribute(HELPER_HOST_PORT);
+ String hostPort = (String)_replicationNode.getActualAttribute(HOST_PORT);
+ Map<String, String> environmentParameters = (Map<String, String>)_replicationNode.getActualAttribute(PARAMETERS);
+ Map<String, String> replicationEnvironmentParameters = (Map<String, String>)_replicationNode.getActualAttribute(REPLICATION_PARAMETERS);
+ Boolean designatedPrimary = (Boolean)_replicationNode.getActualAttribute(DESIGNATED_PRIMARY);
+ Integer priority = (Integer)_replicationNode.getActualAttribute(PRIORITY);
+ Integer quorumOverride = (Integer)_replicationNode.getActualAttribute(QUORUM_OVERRIDE);
+
if (LOGGER.isInfoEnabled())
{
LOGGER.info("Creating environment");
LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath());
- LOGGER.info("Group name " + _groupName);
- LOGGER.info("Node name " + _nodeName);
- LOGGER.info("Node host port " + _nodeHostPort);
- LOGGER.info("Helper host port " + _helperHostPort);
+ LOGGER.info("Group name " + groupName);
+ LOGGER.info("Node name " + _replicationNode.getName());
+ LOGGER.info("Node host port " + hostPort);
+ LOGGER.info("Helper host port " + helperHostPort);
LOGGER.info("Durability " + _durability);
LOGGER.info("Coalescing sync " + _coalescingSync);
- LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary);
+ LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary);
+ LOGGER.info("Node priority " + priority);
+ LOGGER.info("Quorum override " + quorumOverride);
}
Map<String, String> replicationEnvironmentSettings = new HashMap<String, String>(REPCONFIG_DEFAULTS);
- if (_replicationEnvironmentParameters != null && !_replicationEnvironmentParameters.isEmpty())
+ if (replicationEnvironmentParameters != null && !replicationEnvironmentParameters.isEmpty())
{
- replicationEnvironmentSettings.putAll(_replicationEnvironmentParameters);
+ replicationEnvironmentSettings.putAll(replicationEnvironmentParameters);
}
Map<String, String> environmentSettings = new HashMap<String, String>(EnvironmentFacade.ENVCONFIG_DEFAULTS);
- if (_environmentParameters != null && !_environmentParameters.isEmpty())
+ if (environmentParameters != null && !environmentParameters.isEmpty())
{
- environmentSettings.putAll(_environmentParameters);
+ environmentSettings.putAll(environmentParameters);
}
- ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort);
- replicationConfig.setHelperHosts(_helperHostPort);
- replicationConfig.setDesignatedPrimary(_designatedPrimary);
+ ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _replicationNode.getName(), hostPort);
+ replicationConfig.setHelperHosts(helperHostPort);
+ replicationConfig.setDesignatedPrimary(designatedPrimary);
+ replicationConfig.setNodePriority(priority);
+ replicationConfig.setElectableGroupSizeOverride(quorumOverride);
for (Map.Entry<String, String> configItem : replicationEnvironmentSettings.entrySet())
{
@@ -853,9 +950,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public void run()
{
+ String groupName = (String)_replicationNode.getAttribute(GROUP_NAME);
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Checking for changes in the group " + _groupName);
+ LOGGER.debug("Checking for changes in the group " + groupName);
}
ReplicatedEnvironment env = _environment;
@@ -876,7 +974,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + _groupName + "'");
+ LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + groupName + "'");
}
RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName());
@@ -901,7 +999,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
String replicationNodeName = replicationNodeEntry.getKey();
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Remote replication node removed '" + replicationNodeName + "' from '" + _groupName + "'");
+ LOGGER.debug("Remote replication node removed '" + replicationNodeName + "' from '" + groupName + "'");
}
_remoteReplicationNodes.remove(replicationNodeName);
if (replicationGroupListener != null)
@@ -950,11 +1048,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
catch (ExecutionException e)
{
- LOGGER.warn("Cannot update node state for group " + _groupName, e.getCause());
+ LOGGER.warn("Cannot update node state for group " + (String)_replicationNode.getAttribute(GROUP_NAME), e.getCause());
}
catch (TimeoutException e)
{
- LOGGER.warn("Timeout whilst updating node state for group " + _groupName);
+ LOGGER.warn("Timeout whilst updating node state for group " + (String)_replicationNode.getAttribute(GROUP_NAME));
future.cancel(true);
}
}
@@ -1024,4 +1122,5 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index 0ddd7134ac..26651ac64c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Durability.SyncPolicy;
+//TODO: Should LocalReplicationNode implement EnvironmentFacadeFactory instead of having this class?
public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
@@ -49,6 +50,8 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
{
throw new IllegalStateException("Cannot find local replication node among virtual host nodes");
}
+ LocalReplicationNode localReplicationNode = (LocalReplicationNode)localNode;
+
String durability = (String)localNode.getAttribute(ReplicationNode.DURABILITY);
Boolean coalescingSync = (Boolean)localNode.getAttribute(ReplicationNode.COALESCING_SYNC);
@@ -58,8 +61,8 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
+ "! Please set highAvailability.coalescingSync to false in store configuration.");
}
- ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(localNode, new RemoteReplicationNodeFactoryImpl(virtualHost));
- ((LocalReplicationNode)localNode).setReplicatedEnvironmentFacade(facade);
+ ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(localReplicationNode, new RemoteReplicationNodeFactoryImpl(virtualHost));
+ localReplicationNode.setReplicatedEnvironmentFacade(facade);
return facade;
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
index 330abce5cf..bfe79c3dfa 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
@@ -21,11 +21,14 @@
package org.apache.qpid.server.store.berkeleydb.replication;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.UUID;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -34,6 +37,8 @@ import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+
public class LocalReplicationNodeTest extends QpidTestCase
{
@@ -41,6 +46,7 @@ public class LocalReplicationNodeTest extends QpidTestCase
private UUID _id;
private VirtualHost _virtualHost;
private TaskExecutor _taskExecutor;
+ private ReplicatedEnvironmentFacade _facade;
@Override
public void setUp() throws Exception
@@ -48,6 +54,7 @@ public class LocalReplicationNodeTest extends QpidTestCase
super.setUp();
_taskExecutor = mock(TaskExecutor.class);
_virtualHost = mock(VirtualHost.class);
+ _facade = mock(ReplicatedEnvironmentFacade.class);
}
@Override
@@ -129,51 +136,153 @@ public class LocalReplicationNodeTest extends QpidTestCase
assertNodeAttributes(attributes, node);
}
- public void testSetReplicatedEnvironmentFacade()
+ public void testGetValuesFromReplicatedEnvironmentFacade()
{
- long joinTime = System.currentTimeMillis();
- int priority = 1;
- String masterState = "MASTER";
- int quorumOverride = 2;
- int port = 9999;
- String hostPort = "localhost:" + port;
- String groupName = getTestName();
- String storePath = TMP_FOLDER + File.separator + groupName;
- boolean designatedPrimary = true;
- String nodeName = "nodeName";
-
- Map<String, Object> attributes = createValidAttributes();
- attributes.put(ReplicationNode.HOST_PORT, hostPort);
- attributes.put(ReplicationNode.HELPER_HOST_PORT, hostPort);
- attributes.put(ReplicationNode.STORE_PATH, storePath);
- attributes.put(ReplicationNode.DESIGNATED_PRIMARY, designatedPrimary);
- attributes.put(ReplicationNode.GROUP_NAME, groupName);
- attributes.put(ReplicationNode.NAME, nodeName);
- LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor);
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
assertNull("Unexpected role attribute", node.getAttribute(ReplicationNode.ROLE));
- assertNull("Unexpected quorum override attribute", node.getAttribute(ReplicationNode.QUORUM_OVERRIDE));
- assertNull("Unexpected priority attribute", node.getAttribute(ReplicationNode.PRIORITY));
assertNull("Unexpected join time attribute", node.getAttribute(ReplicationNode.JOIN_TIME));
+ assertNull("Unexpected last transaction id", node.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID));
+ assertEquals("Unexpected priority attribute", LocalReplicationNode.DEFAULT_PRIORITY, node.getAttribute(ReplicationNode.PRIORITY));
+ assertEquals("Unexpected quorum override attribute", LocalReplicationNode.DEFAULT_QUORUM_OVERRIDE, node.getAttribute(ReplicationNode.QUORUM_OVERRIDE));
+ assertEquals("Unexpected designated primary attribute", LocalReplicationNode.DEFAULT_DESIGNATED_PRIMARY, node.getAttribute(ReplicationNode.DESIGNATED_PRIMARY));
+
+ String masterState = "MASTER";
+ long joinTime = System.currentTimeMillis();
+ long lastKnowTransactionId = 1000l;
+ boolean designatedPrimary = true;
+ int priority = 2;
+ int quorumOverride = 3;
+
+ when(_facade.getNodeState()).thenReturn(masterState);
+ when(_facade.getJoinTime()).thenReturn(joinTime);
+ when(_facade.getLastKnownReplicationTransactionId()).thenReturn(lastKnowTransactionId);
+ when(_facade.isDesignatedPrimary()).thenReturn(designatedPrimary);
+ when(_facade.getPriority()).thenReturn(priority);
+ when(_facade.getElectableGroupSizeOverride()).thenReturn(quorumOverride);
- ReplicatedEnvironmentFacade facade = mock(ReplicatedEnvironmentFacade.class);
- when(facade.getNodeState()).thenReturn(masterState);
- when(facade.getPriority()).thenReturn(priority);
- when(facade.getJoinTime()).thenReturn(joinTime);
- when(facade.getQuorumOverride()).thenReturn(quorumOverride);
- when(facade.getGroupName()).thenReturn(groupName);
- when(facade.getHelperHostPort()).thenReturn(hostPort);
- when(facade.getHostPort()).thenReturn(hostPort);
- when(facade.isDesignatedPrimary()).thenReturn(designatedPrimary);
- when(facade.getNodeName()).thenReturn(nodeName);
-
- node.setReplicatedEnvironmentFacade(facade);
+ node.setReplicatedEnvironmentFacade(_facade);
assertEquals("Unexpected role attribute", masterState, node.getAttribute(ReplicationNode.ROLE));
- assertEquals("Unexpected quorum override attribute", quorumOverride, node.getAttribute(ReplicationNode.QUORUM_OVERRIDE));
- assertEquals("Unexpected priority attribute", priority, node.getAttribute(ReplicationNode.PRIORITY));
assertEquals("Unexpected join time attribute", joinTime, node.getAttribute(ReplicationNode.JOIN_TIME));
+ assertEquals("Unexpected last transaction id attribute", lastKnowTransactionId, node.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID));
+ assertEquals("Unexpected priority attribute", priority, node.getAttribute(ReplicationNode.PRIORITY));
+ assertEquals("Unexpected quorum override attribute", quorumOverride, node.getAttribute(ReplicationNode.QUORUM_OVERRIDE));
+ }
- assertNodeAttributes(attributes, node);
+ public void testSetDesignatedPrimary() throws Exception
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
+ node.setReplicatedEnvironmentFacade(_facade);
+
+ node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.DESIGNATED_PRIMARY, true));
+
+ verify(_facade).setDesignatedPrimary(true);
+
+ node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.DESIGNATED_PRIMARY, false));
+ verify(_facade).setDesignatedPrimary(false);
+ }
+
+ public void testSetPriority() throws Exception
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
+ node.setReplicatedEnvironmentFacade(_facade);
+ node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.PRIORITY, 100));
+
+ verify(_facade).setPriority(100);
+ }
+
+ public void testSetQuorumOverride() throws Exception
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
+ node.setReplicatedEnvironmentFacade(_facade);
+
+ node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.QUORUM_OVERRIDE, 10));
+
+ verify(_facade).setElectableGroupSizeOverride(10);
+ }
+
+ public void testSetRole() throws Exception
+ {
+ when(_facade.getNodeState()).thenReturn(ReplicatedEnvironment.State.REPLICA.name());
+
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
+ node.setReplicatedEnvironmentFacade(_facade);
+
+ node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.MASTER.name()));
+
+ verify(_facade).transferMasterToSelfAsynchronously();
+ }
+
+ public void testSetRoleToReplicaUnsupported() throws Exception
+ {
+ when(_facade.getNodeState()).thenReturn(ReplicatedEnvironment.State.REPLICA.name());
+
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
+ node.setReplicatedEnvironmentFacade(_facade);
+
+ try
+ {
+ node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.REPLICA.name()));
+ fail("Exception not thrown");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ // PASS
+ }
+ }
+
+ public void testSetRoleWhenCurrentRoleNotRepliaIsUnsupported() throws Exception
+ {
+ when(_facade.getNodeState()).thenReturn(ReplicatedEnvironment.State.MASTER.name());
+
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
+ node.setReplicatedEnvironmentFacade(_facade);
+
+ try
+ {
+ node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.MASTER.name()));
+ fail("Exception not thrown");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ // PASS
+ }
+ }
+
+ public void testSetImmutableAttributesThrowException() throws Exception
+ {
+ Map<String, Object> changeAttributeMap = new HashMap<String, Object>();
+ changeAttributeMap.put(ReplicationNode.GROUP_NAME, "newGroupName");
+ changeAttributeMap.put(ReplicationNode.HELPER_HOST_PORT, "newhost:1234");
+ changeAttributeMap.put(ReplicationNode.HOST_PORT, "newhost:1234");
+ changeAttributeMap.put(ReplicationNode.COALESCING_SYNC, Boolean.FALSE);
+ changeAttributeMap.put(ReplicationNode.DURABILITY, "durability");
+ changeAttributeMap.put(ReplicationNode.JOIN_TIME, 1000l);
+ changeAttributeMap.put(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID, 10001l);
+ changeAttributeMap.put(ReplicationNode.NAME, "newName");
+ changeAttributeMap.put(ReplicationNode.STORE_PATH, "/not/used");
+ changeAttributeMap.put(ReplicationNode.PARAMETERS, Collections.emptyMap());
+ changeAttributeMap.put(ReplicationNode.REPLICATION_PARAMETERS, Collections.emptyMap());
+
+ for (Entry<String, Object> entry : changeAttributeMap.entrySet())
+ {
+ assertSetAttributesThrowsException(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void assertSetAttributesThrowsException(String attributeName, Object attributeValue)
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
+
+ try
+ {
+ node.setAttributes(Collections.<String, Object>singletonMap(attributeName, attributeValue));
+ fail("Operation to change attribute '" + attributeName + "' should fail");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ // pass
+ }
}
private Map<String, Object> createValidAttributes()
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index cea7d52d43..1bce41403f 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -28,7 +28,7 @@ import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
import static org.apache.qpid.server.model.ReplicationNode.NAME;
import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
-import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH;
+import static org.apache.qpid.server.model.ReplicationNode.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -79,6 +79,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
private static final String TEST_DURABILITY = Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString();
private static final boolean TEST_DESIGNATED_PRIMARY = false;
private static final boolean TEST_COALESCING_SYNC = true;
+ private static final int TEST_PRIORITY = 10;
+ private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0;
private File _storePath;
private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
@@ -122,7 +124,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
}
public void testEnvironmentFacade() throws Exception
{
- EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster();
+ EnvironmentFacade ef = createMaster();
assertNotNull("Environment should not be null", ef);
Environment e = ef.getEnvironment();
assertTrue("Environment is not valid", e.isValid());
@@ -130,7 +132,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
public void testClose() throws Exception
{
- EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster();
+ EnvironmentFacade ef = createMaster();
ef.close();
Environment e = ef.getEnvironment();
@@ -139,7 +141,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
public void testOpenDatabases() throws Exception
{
- EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster();
+ EnvironmentFacade ef = createMaster();
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
dbConfig.setAllowCreate(true);
@@ -153,7 +155,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
public void testGetOpenDatabaseForNonExistingDatabase() throws Exception
{
- EnvironmentFacade ef = (ReplicatedEnvironmentFacade) createMaster();
+ EnvironmentFacade ef = createMaster();
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
dbConfig.setAllowCreate(true);
@@ -173,50 +175,42 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
public void testGetGroupName() throws Exception
{
- assertEquals("Unexpected group name", TEST_GROUP_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getGroupName());
+ assertEquals("Unexpected group name", TEST_GROUP_NAME, createMaster().getGroupName());
}
public void testGetNodeName() throws Exception
{
- assertEquals("Unexpected group name", TEST_NODE_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getNodeName());
+ assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName());
}
public void testGetNodeHostPort() throws Exception
{
- assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, ((ReplicatedEnvironmentFacade) createMaster()).getHostPort());
+ assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort());
}
public void testGetHelperHostPort() throws Exception
{
- assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, ((ReplicatedEnvironmentFacade) createMaster()).getHelperHostPort());
+ assertEquals("Unexpected node helper host port", TEST_NODE_HELPER_HOST_PORT, createMaster().getHelperHostPort());
}
public void testGetDurability() throws Exception
{
- assertEquals("Unexpected durability", TEST_DURABILITY.toString(), ((ReplicatedEnvironmentFacade) createMaster()).getDurability());
+ assertEquals("Unexpected durability", TEST_DURABILITY.toString(), createMaster().getDurability());
}
public void testIsCoalescingSync() throws Exception
{
- assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, ((ReplicatedEnvironmentFacade) createMaster()).isCoalescingSync());
+ assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, createMaster().isCoalescingSync());
}
public void testGetNodeState() throws Exception
{
- assertEquals("Unexpected state", State.MASTER.name(), ((ReplicatedEnvironmentFacade) createMaster()).getNodeState());
- }
-
- public void testIsDesignatedPrimary() throws Exception
- {
- ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster();
- assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
- master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY);
- assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
+ assertEquals("Unexpected state", State.MASTER.name(), createMaster().getNodeState());
}
public void testGetGroupMembers() throws Exception
{
- List<Map<String, String>> groupMembers = ((ReplicatedEnvironmentFacade) createMaster()).getGroupMembers();
+ List<Map<String, String>> groupMembers = createMaster().getGroupMembers();
Map<String, String> expectedMember = new HashMap<String, String>();
expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_NAME, TEST_NODE_NAME);
expectedMember.put(ReplicatedEnvironmentFacade.GRP_MEM_COL_NODE_HOST_PORT, TEST_NODE_HOST_PORT);
@@ -224,9 +218,35 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Unexpected group members", expectedGroupMembers, new HashSet<Map<String, String>>(groupMembers));
}
+ public void testPriority() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ assertEquals("Unexpected priority", TEST_PRIORITY, facade.getPriority());
+
+ facade.setPriority(TEST_PRIORITY + 1);
+ assertEquals("Unexpected priority after change", TEST_PRIORITY + 1, facade.getPriority());
+ }
+
+ public void testDesignatedPrimary() throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
+ master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY);
+ assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
+ }
+
+
+ public void testElectableGroupSizeOverride() throws Exception
+ {
+ ReplicatedEnvironmentFacade facade = createMaster();
+ assertEquals("Unexpected Electable Group Size Override", TEST_ELECTABLE_GROUP_OVERRIDE, facade.getElectableGroupSizeOverride());
+ facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1);
+ assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride());
+ }
+
public void testReplicationGroupListenerHearsAboutExistingRemoteReplicationNodes() throws Exception
{
- ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster();
+ ReplicatedEnvironmentFacade master = createMaster();
String nodeName2 = TEST_NODE_NAME + "_2";
String host = "localhost";
int port = getNextAvailable(TEST_NODE_PORT + 1);
@@ -383,7 +403,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
public void testRemoveNodeFromGroup() throws Exception
{
- ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) createMaster();
+ ReplicatedEnvironmentFacade environmentFacade = createMaster();
String node2Name = TEST_NODE_NAME + "_2";
String node2NodeHostPort = "localhost:" + getNextAvailable(TEST_NODE_PORT + 1);
@@ -398,26 +418,9 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Unexpected group members count", 1, groupMembers.size());
}
- public void testSetDesignatedPrimary() throws Exception
- {
- ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) createMaster();
- environmentFacade.setDesignatedPrimary(false);
- assertFalse("Unexpected designated primary", environmentFacade.isDesignatedPrimary());
- }
-
- public void testGetNodePriority() throws Exception
- {
- assertEquals("Unexpected node priority", 1, ((ReplicatedEnvironmentFacade) createMaster()).getPriority());
- }
-
- public void testGetElectableGroupSizeOverride() throws Exception
- {
- assertEquals("Unexpected Electable Group Size Override", 0, ((ReplicatedEnvironmentFacade) createMaster()).getElectableGroupSizeOverride());
- }
-
public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
{
- ReplicatedEnvironmentFacade master = (ReplicatedEnvironmentFacade) createMaster();
+ ReplicatedEnvironmentFacade master = createMaster();
int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
String replica1NodeName = TEST_NODE_NAME + "_1";
@@ -545,11 +548,12 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertTrue("Replica " + nodeName + " was not started", testStateChangeListener.awaitForStateChange(LISTENER_TIMEOUT, TimeUnit.SECONDS));
return replicaEnvironmentFacade;
}
+
private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
{
- ReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary);
+ LocalReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary);
ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(node, _remoteReplicationNodeFactory);
ref.setReplicationGroupListener(replicationGroupListener);
ref.setStateChangeListener(stateChangeListener);
@@ -571,18 +575,31 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
return dbConfig;
}
- private ReplicationNode createReplicationNodeMock(String nodeName, String nodeHostPort, boolean designatedPrimary)
+ private LocalReplicationNode createReplicationNodeMock(String nodeName, String nodeHostPort, boolean designatedPrimary)
{
- ReplicationNode node = mock(ReplicationNode.class);
+ LocalReplicationNode node = mock(LocalReplicationNode.class);
when(node.getAttribute(NAME)).thenReturn(nodeName);
when(node.getName()).thenReturn(nodeName);
when(node.getAttribute(HOST_PORT)).thenReturn(nodeHostPort);
when(node.getAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary);
+ when(node.getAttribute(QUORUM_OVERRIDE)).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE);
+ when(node.getAttribute(PRIORITY)).thenReturn(TEST_PRIORITY);
when(node.getAttribute(GROUP_NAME)).thenReturn(TEST_GROUP_NAME);
when(node.getAttribute(HELPER_HOST_PORT)).thenReturn(TEST_NODE_HELPER_HOST_PORT);
when(node.getAttribute(DURABILITY)).thenReturn(TEST_DURABILITY);
when(node.getAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC);
+
+ // TMP REF contract with LRN is too complicated.
+ when(node.getActualAttribute(HOST_PORT)).thenReturn(nodeHostPort);
+ when(node.getActualAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary);
+ when(node.getActualAttribute(QUORUM_OVERRIDE)).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE);
+ when(node.getActualAttribute(PRIORITY)).thenReturn(TEST_PRIORITY);
+ when(node.getActualAttribute(GROUP_NAME)).thenReturn(TEST_GROUP_NAME);
+ when(node.getActualAttribute(HELPER_HOST_PORT)).thenReturn(TEST_NODE_HELPER_HOST_PORT);
+ when(node.getActualAttribute(DURABILITY)).thenReturn(TEST_DURABILITY);
+ when(node.getActualAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC);
+
Map<String, String> repConfig = new HashMap<String, String>();
repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java
new file mode 100644
index 0000000000..b70703c526
--- /dev/null
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.systest.rest.QpidRestTestCase;
+import org.apache.qpid.util.FileUtils;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+
+public class ReplicationNodeRestTest extends QpidRestTestCase
+{
+ private static final String NODE_NAME = "node1";
+ private static final String GROUP_NAME = "replication-group";
+
+ private String _hostName;
+ private File _storeFile;
+ private int _haPort;
+ private String _nodeRestUrl;
+ private String _hostRestUrl;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _hostName = getTestName();
+ _nodeRestUrl = "/rest/replicationnode/" + _hostName + "/" + NODE_NAME;
+ _hostRestUrl = "/rest/virtualhost/" + _hostName;
+
+ _storeFile = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis());
+ _haPort = findFreePort();
+
+ Map<String, Object> hostData = new HashMap<String, Object>();
+ hostData.put(VirtualHost.NAME, _hostName);
+ hostData.put(VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE);
+ hostData.put(VirtualHost.DESIRED_STATE, State.QUIESCED);
+
+ int responseCode = getRestTestHelper().submitRequest(_hostRestUrl, "PUT", hostData);
+ assertEquals("Unexpected response code for virtual host creation request", 201, responseCode);
+
+ String hostPort = "localhost:" + _haPort;
+ Map<String, Object> nodeData = new HashMap<String, Object>();
+ nodeData.put(ReplicationNode.NAME, NODE_NAME);
+ nodeData.put(ReplicationNode.GROUP_NAME, GROUP_NAME);
+ nodeData.put(ReplicationNode.HOST_PORT, hostPort);
+ nodeData.put(ReplicationNode.HELPER_HOST_PORT, hostPort);
+ nodeData.put(ReplicationNode.STORE_PATH, _storeFile.getAbsolutePath());
+
+ responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", nodeData);
+ assertEquals("Unexpected response code for node creation request", 201, responseCode);
+
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ if (_storeFile != null)
+ {
+ FileUtils.delete(_storeFile, true);
+ }
+ }
+ }
+
+ public void testChangePriority() throws Exception
+ {
+ assertReplicationNodeSetAttribute(ReplicationNode.PRIORITY, 1, 2, 3);
+ }
+
+ public void testChangeQuorumOverride() throws Exception
+ {
+ assertReplicationNodeSetAttribute(ReplicationNode.QUORUM_OVERRIDE, 0, 1, 2);
+ }
+
+ public void testChangeDesignatedPrimary() throws Exception
+ {
+ assertReplicationNodeSetAttribute(ReplicationNode.DESIGNATED_PRIMARY, false, true, false);
+ }
+
+ public void testCreationOfSecondLocalReplicationNodeFails() throws Exception
+ {
+ String hostPort = "localhost:" + _haPort;
+ Map<String, Object> nodeData = new HashMap<String, Object>();
+ nodeData.put(ReplicationNode.NAME, NODE_NAME + 1);
+ nodeData.put(ReplicationNode.GROUP_NAME, GROUP_NAME);
+ nodeData.put(ReplicationNode.HOST_PORT, hostPort);
+ nodeData.put(ReplicationNode.HELPER_HOST_PORT, hostPort);
+ nodeData.put(ReplicationNode.STORE_PATH, _storeFile.getAbsolutePath());
+
+ int responseCode = getRestTestHelper().submitRequest(_nodeRestUrl + 1, "PUT", nodeData);
+ assertEquals("Adding of a second replication node should fail", 409, responseCode);
+ }
+
+ public void testUpdateImmutableAttributeWithTheSameValueSucceeds() throws Exception
+ {
+ String hostPort = "localhost:" + _haPort;
+ Map<String, Object> nodeData = new HashMap<String, Object>();
+ nodeData.put(ReplicationNode.NAME, NODE_NAME);
+ nodeData.put(ReplicationNode.GROUP_NAME, GROUP_NAME);
+ nodeData.put(ReplicationNode.HOST_PORT, hostPort);
+ nodeData.put(ReplicationNode.HELPER_HOST_PORT, hostPort);
+ nodeData.put(ReplicationNode.STORE_PATH, _storeFile.getAbsolutePath());
+
+ int responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", nodeData);
+ assertEquals("Update with unchanged attribute should succeed", 200, responseCode);
+ }
+
+ private void assertReplicationNodeSetAttribute(String attributeName, Object initialValue,
+ Object newValueBeforeHostActivation, Object newValueAfterHostActivation) throws IOException, JsonGenerationException,
+ JsonMappingException
+ {
+ Map<String, Object> nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl);
+ assertEquals("Unexpected " + attributeName + " after creation", initialValue, nodeAttributes.get(attributeName));
+
+ int responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", Collections.<String, Object>singletonMap(attributeName, newValueBeforeHostActivation));
+ assertEquals("Unexpected response code for node " + attributeName + " update", 200, responseCode);
+
+ nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl);
+ assertEquals("Unexpected " + attributeName + " after update but before host activation", newValueBeforeHostActivation, nodeAttributes.get(attributeName));
+
+ responseCode = getRestTestHelper().submitRequest(_hostRestUrl, "PUT", Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE));
+ assertEquals("Unexpected response code for virtual host update status", 200, responseCode);
+
+ nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl);
+ assertEquals("Unexpected " + attributeName + " after host activation", newValueBeforeHostActivation, nodeAttributes.get(attributeName));
+
+ responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", Collections.<String, Object>singletonMap(attributeName, newValueAfterHostActivation));
+ assertEquals("Unexpected response code for node " + attributeName + " update", 200, responseCode);
+
+ nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl);
+ assertEquals("Unexpected " + attributeName + " after update after host activation", newValueAfterHostActivation, nodeAttributes.get(attributeName));
+ }
+}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/VirtualHostRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/VirtualHostRestTest.java
index 9811891886..b4a3661039 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/VirtualHostRestTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/VirtualHostRestTest.java
@@ -36,21 +36,49 @@ public class VirtualHostRestTest extends QpidRestTestCase
{
private static final String VIRTUALHOST_NODES_ATTRIBUTE = "replicationnodes";
+ private File _storeFile;
+ private String _hostName;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _hostName = getTestName();
+
+ _storeFile = new File(TMP_FOLDER, "store-" + _hostName + "-" + System.currentTimeMillis());
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ if (_storeFile != null)
+ {
+ FileUtils.delete(_storeFile, true);
+ }
+ }
+ }
public void testPutCreateHAVirtualHost() throws Exception
{
Map<String, Object> hostData = new HashMap<String, Object>();
- String hostName = getTestName();
- hostData.put(VirtualHost.NAME, hostName);
+ hostData.put(VirtualHost.NAME, _hostName);
hostData.put(VirtualHost.TYPE, BDBHAVirtualHostFactory.TYPE);
hostData.put(VirtualHost.DESIRED_STATE, State.QUIESCED);
- int responseCode = getRestTestHelper().submitRequest("/rest/virtualhost/" + hostName, "PUT", hostData);
+ int responseCode = getRestTestHelper().submitRequest("/rest/virtualhost/" + _hostName, "PUT", hostData);
assertEquals("Unexpected response code for virtual host creation request", 201, responseCode);
- // TODO should observe vh state
+ Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + _hostName);
+ assertEquals("Virtual host in unexpected desired state ", State.QUIESCED.name(), hostDetails.get(VirtualHost.DESIRED_STATE));
+ assertEquals("Virtual host in unexpected actual state ", State.QUIESCED.name(), hostDetails.get(VirtualHost.STATE));
- String storeLocation = new File(TMP_FOLDER, "store-" + hostName + "-" + System.currentTimeMillis()).getAbsolutePath();
+ String storeLocation = _storeFile.getAbsolutePath();
String nodeName = "node1";
String groupName = "replication-group";
int port = findFreePort();
@@ -63,44 +91,35 @@ public class VirtualHostRestTest extends QpidRestTestCase
nodeData.put(ReplicationNode.HELPER_HOST_PORT, hostPort);
nodeData.put(ReplicationNode.STORE_PATH, storeLocation);
- String createNodeUrl = "/rest/replicationnode/" + hostName + "/" + nodeName;
+ String createNodeUrl = "/rest/replicationnode/" + _hostName + "/" + nodeName;
responseCode = getRestTestHelper().submitRequest(createNodeUrl, "PUT", nodeData);
assertEquals("Unexpected response code for node creation request", 201, responseCode);
hostData.clear();
hostData.put(VirtualHost.DESIRED_STATE, State.ACTIVE);
- responseCode = getRestTestHelper().submitRequest("/rest/virtualhost/" + hostName, "PUT", hostData);
+ responseCode = getRestTestHelper().submitRequest("/rest/virtualhost/" + _hostName, "PUT", hostData);
assertEquals("Unexpected response code for virtual host update status", 200, responseCode);
- waitForVirtualHostActivation(hostName, 10000l);
+ waitForVirtualHostActivation(_hostName, 10000l);
- Map<String, Object> replicationNodeDetails = getRestTestHelper().getJsonAsSingletonList("/rest/replicationnode/" + hostName + "/" + nodeName);
+ Map<String, Object> replicationNodeDetails = getRestTestHelper().getJsonAsSingletonList("/rest/replicationnode/" + _hostName + "/" + nodeName);
assertLocalNode(nodeData, replicationNodeDetails);
- try
- {
- // make sure that the host is saved in the broker store
- restartBroker();
- Map<String, Object> hostDetails = waitForVirtualHostActivation(hostName, 10000l);
- Asserts.assertVirtualHost(hostName, hostDetails);
- assertEquals("Unexpected virtual host type", BDBHAVirtualHostFactory.TYPE.toString(), hostDetails.get(VirtualHost.TYPE));
+ // make sure that the host is saved in the broker store
+ restartBroker();
- @SuppressWarnings("unchecked")
- List<Map<String, Object>> nodes = (List<Map<String, Object>>) hostDetails.get(VIRTUALHOST_NODES_ATTRIBUTE);
- assertEquals("Unexpected number of nodes", 1, nodes.size());
- assertLocalNode(nodeData, nodes.get(0));
+ hostDetails = waitForVirtualHostActivation(_hostName, 10000l);
+ Asserts.assertVirtualHost(_hostName, hostDetails);
+ assertEquals("Unexpected virtual host type", BDBHAVirtualHostFactory.TYPE.toString(), hostDetails.get(VirtualHost.TYPE));
- // verify that that node rest interface returns the same node attributes
- replicationNodeDetails = getRestTestHelper().getJsonAsSingletonList("/rest/replicationnode/" + hostName + "/" + nodeName);
- assertLocalNode(nodeData, replicationNodeDetails);
- }
- finally
- {
- if (storeLocation != null)
- {
- FileUtils.delete(new File(storeLocation), true);
- }
- }
+ @SuppressWarnings("unchecked")
+ List<Map<String, Object>> nodes = (List<Map<String, Object>>) hostDetails.get(VIRTUALHOST_NODES_ATTRIBUTE);
+ assertEquals("Unexpected number of nodes", 1, nodes.size());
+ assertLocalNode(nodeData, nodes.get(0));
+
+ // verify that that node rest interface returns the same node attributes
+ replicationNodeDetails = getRestTestHelper().getJsonAsSingletonList("/rest/replicationnode/" + _hostName + "/" + nodeName);
+ assertLocalNode(nodeData, replicationNodeDetails);
}
private void assertLocalNode(Map<String, Object> expectedNodeAttributes, Map<String, Object> actualNodesAttributes)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java
index 1d064347ad..71002d9f72 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java
@@ -54,7 +54,7 @@ public interface ReplicationNode extends ConfiguredObject
/** A designated primary setting for 2-nodes group*/
String DESIGNATED_PRIMARY = "designatedPrimary";
- /** Node priority*/
+ /** Node priority. 1 signifies normal priority; 0 signifies node will never be elected. */
String PRIORITY = "priority";
/** The overridden minimum number of group nodes required to commit transaction on this node instead of simple majority*/