summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-02-06 17:34:49 +0000
committerKeith Wall <kwall@apache.org>2014-02-06 17:34:49 +0000
commitb9a025557f13f69d742f62aa0f2c099cd506add9 (patch)
tree6e15ea31b653cc9d37e27370f19023a96800cfa7
parent1ee354e69fe9863274ba5e42d63adbe2bf9309a2 (diff)
downloadqpid-python-b9a025557f13f69d742f62aa0f2c099cd506add9.tar.gz
QPID-5409: Environment thread is now used to perform set designated primary/set electable group override/set node prioriy
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1565370 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java99
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java12
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java5
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java24
5 files changed, 108 insertions, 38 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
index e5e0f866fa..ef2c48463b 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
@@ -284,7 +284,11 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
public boolean changeAttribute(final String name, final Object expected, final Object desired)
{
updateReplicatedEnvironmentFacade(name, desired);
- return super.changeAttribute(name, expected, desired);
+ if (!ROLE.equals(name))
+ {
+ return super.changeAttribute(name, expected, desired);
+ }
+ return false;
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index e32ded5dd5..dc0b1b2ff1 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -80,6 +80,7 @@ import com.sleepycat.je.rep.ReplicationConfig;
import com.sleepycat.je.rep.ReplicationGroup;
import com.sleepycat.je.rep.ReplicationMutableConfig;
import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.RestartRequiredException;
import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
import com.sleepycat.je.rep.util.DbPing;
@@ -153,7 +154,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private final String _prettyGroupNodeName;
private final File _environmentDirectory;
- private final ExecutorService _restartEnvironmentExecutor;
+ private final ExecutorService _environmentJobExecutor;
private final ScheduledExecutorService _groupChangeExecutor;
private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING);
private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>();
@@ -183,7 +184,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_coalescingSync = (Boolean)_replicationNode.getAttribute(COALESCING_SYNC);
_prettyGroupNodeName = (String)_replicationNode.getAttribute(GROUP_NAME) + ":" + _replicationNode.getName();
- _restartEnvironmentExecutor = Executors.newFixedThreadPool(1, new DaemonThreadFactory("Environment-Starter:" + _prettyGroupNodeName));
+ // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread
+ _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
_groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
_remoteReplicationNodeFactory = remoteReplicationNodeFactory;
@@ -224,7 +226,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName);
}
- _restartEnvironmentExecutor.shutdown();
+ _environmentJobExecutor.shutdown();
_groupChangeExecutor.shutdown();
closeDatabases();
closeEnvironment();
@@ -239,8 +241,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public AMQStoreException handleDatabaseException(String contextMessage, final DatabaseException dbe)
{
- //TODO: restart environment if dbe instanceof MasterReplicaTransitionException
- boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException);
+ boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException);
if (restart)
{
if (_state.compareAndSet(State.OPEN, State.RESTARTING))
@@ -249,7 +250,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe);
}
- _restartEnvironmentExecutor.execute(new Runnable()
+
+ _environmentJobExecutor.execute(new Runnable()
{
@Override
public void run()
@@ -436,20 +438,36 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return state.toString();
}
+ public void removeNodeFromGroup(final String nodeName)
+ {
+ createReplicationGroupAdmin().removeMember(nodeName);
+ }
+
public boolean isDesignatedPrimary()
{
return _environment.getRepMutableConfig().getDesignatedPrimary();
}
- public void removeNodeFromGroup(final String nodeName)
+ public Future<Void> setDesignatedPrimary(final boolean isPrimary)
{
- createReplicationGroupAdmin().removeMember(nodeName);
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Submitting a job to set designated primary on " + _prettyGroupNodeName + " to " + isPrimary);
+ }
+
+ return _environmentJobExecutor.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ setDesignatedPrimaryInternal(isPrimary);
+ return null;
+ }
+ });
}
- public void setDesignatedPrimary(final boolean isPrimary) throws AMQStoreException
+ private void setDesignatedPrimaryInternal(final boolean isPrimary)
{
- // TODO : we have a race if the RE is being restarted?
- // if (restarting) put setDesignatedPrimary job in queue???
try
{
final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
@@ -461,10 +479,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.info("Node " + _prettyGroupNodeName + " successfully set designated primary : " + isPrimary);
}
}
- catch (DatabaseException e)
+ catch (Exception e)
{
- // TODO: I am not sure about the exception handing here
- throw handleDatabaseException("Cannot set designated primary", e);
+ LOGGER.error("Cannot set designated primary to " + isPrimary + " on node " + _prettyGroupNodeName, e);
}
}
@@ -474,7 +491,25 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return repConfig.getNodePriority();
}
- public void setPriority(int priority) throws AMQStoreException
+ public Future<Void> setPriority(final int priority)
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Submitting a job to set priority on " + _prettyGroupNodeName + " to " + priority);
+ }
+
+ return _environmentJobExecutor.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ setPriorityInternal(priority);
+ return null;
+ }
+ });
+ }
+
+ private void setPriorityInternal(int priority)
{
try
{
@@ -487,10 +522,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.debug("Node " + _prettyGroupNodeName + " priority has been changed to " + priority);
}
}
- catch (DatabaseException e)
+ catch (Exception e)
{
- // TODO: I am not sure about the exception handing here
- throw handleDatabaseException("Cannot set priority on " + _prettyGroupNodeName, e);
+ LOGGER.error("Cannot set priority to " + priority + " on node " + _prettyGroupNodeName, e);
}
}
@@ -500,7 +534,25 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return repConfig.getElectableGroupSizeOverride();
}
- public void setElectableGroupSizeOverride(int electableGroupOverride) throws AMQStoreException
+ public Future<Void> setElectableGroupSizeOverride(final int electableGroupOverride)
+ {
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Submitting a job to set electable group override on " + _prettyGroupNodeName + " to " + electableGroupOverride);
+ }
+
+ return _environmentJobExecutor.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ setElectableGroupSizeOverrideInternal(electableGroupOverride);
+ return null;
+ }
+ });
+ }
+
+ private void setElectableGroupSizeOverrideInternal(int electableGroupOverride)
{
try
{
@@ -513,10 +565,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.debug("Node " + _prettyGroupNodeName + " electable group size override has been changed to " + electableGroupOverride);
}
}
- catch (DatabaseException e)
+ catch (Exception e)
{
- // TODO: I am not sure about the exception handing here
- throw handleDatabaseException("Cannot set electable group size override on " + _prettyGroupNodeName, e);
+ LOGGER.error("Cannot set electable group size to " + electableGroupOverride + " on node " + _prettyGroupNodeName, e);
}
}
@@ -666,7 +717,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- private void restartEnvironment(DatabaseException dbe) throws AMQStoreException
+ private void restartEnvironment(DatabaseException dbe)
{
LOGGER.info("Restarting environment");
@@ -828,7 +879,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private ReplicatedEnvironment createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig,
final ReplicationConfig replicationConfig)
{
- Future<ReplicatedEnvironment> environmentFuture = _restartEnvironmentExecutor.submit(new Callable<ReplicatedEnvironment>(){
+ Future<ReplicatedEnvironment> environmentFuture = _environmentJobExecutor.submit(new Callable<ReplicatedEnvironment>(){
@Override
public ReplicatedEnvironment call() throws Exception
{
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 45117b17c2..052341c810 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -221,8 +222,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
ReplicatedEnvironmentFacade facade = createMaster();
assertEquals("Unexpected priority", TEST_PRIORITY, facade.getPriority());
-
- facade.setPriority(TEST_PRIORITY + 1);
+ Future<Void> future = facade.setPriority(TEST_PRIORITY + 1);
+ future.get(5, TimeUnit.SECONDS);
assertEquals("Unexpected priority after change", TEST_PRIORITY + 1, facade.getPriority());
}
@@ -230,16 +231,17 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
ReplicatedEnvironmentFacade master = createMaster();
assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
- master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY);
+ Future<Void> future = master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY);
+ future.get(5, TimeUnit.SECONDS);
assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary());
}
-
public void testElectableGroupSizeOverride() throws Exception
{
ReplicatedEnvironmentFacade facade = createMaster();
assertEquals("Unexpected Electable Group Size Override", TEST_ELECTABLE_GROUP_OVERRIDE, facade.getElectableGroupSizeOverride());
- facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1);
+ Future<Void> future = facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1);
+ future.get(5, TimeUnit.SECONDS);
assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride());
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
index 52b873384d..9adc834f95 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
@@ -204,6 +204,11 @@ public class HAClusterManagementTest extends QpidBrokerTestCase
final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPort);
assertFalse("Unexpected designated primary before change", storeBean.getDesignatedPrimary());
storeBean.setDesignatedPrimary(true);
+ long limit = System.currentTimeMillis() + 5000;
+ while(!storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit)
+ {
+ Thread.sleep(100l);
+ }
assertTrue("Unexpected designated primary after change", storeBean.getDesignatedPrimary());
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java
index b70703c526..c757a5d99c 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java
@@ -137,8 +137,7 @@ public class ReplicationNodeRestTest extends QpidRestTestCase
}
private void assertReplicationNodeSetAttribute(String attributeName, Object initialValue,
- Object newValueBeforeHostActivation, Object newValueAfterHostActivation) throws IOException, JsonGenerationException,
- JsonMappingException
+ Object newValueBeforeHostActivation, Object newValueAfterHostActivation) throws Exception
{
Map<String, Object> nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl);
assertEquals("Unexpected " + attributeName + " after creation", initialValue, nodeAttributes.get(attributeName));
@@ -146,19 +145,28 @@ public class ReplicationNodeRestTest extends QpidRestTestCase
int responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", Collections.<String, Object>singletonMap(attributeName, newValueBeforeHostActivation));
assertEquals("Unexpected response code for node " + attributeName + " update", 200, responseCode);
- nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl);
- assertEquals("Unexpected " + attributeName + " after update but before host activation", newValueBeforeHostActivation, nodeAttributes.get(attributeName));
+ waitForAttributeChanged(attributeName, newValueBeforeHostActivation);
responseCode = getRestTestHelper().submitRequest(_hostRestUrl, "PUT", Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE));
assertEquals("Unexpected response code for virtual host update status", 200, responseCode);
- nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl);
- assertEquals("Unexpected " + attributeName + " after host activation", newValueBeforeHostActivation, nodeAttributes.get(attributeName));
+ waitForAttributeChanged(attributeName, newValueBeforeHostActivation);
responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", Collections.<String, Object>singletonMap(attributeName, newValueAfterHostActivation));
assertEquals("Unexpected response code for node " + attributeName + " update", 200, responseCode);
- nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl);
- assertEquals("Unexpected " + attributeName + " after update after host activation", newValueAfterHostActivation, nodeAttributes.get(attributeName));
+ waitForAttributeChanged(attributeName, newValueAfterHostActivation);
+ }
+
+ private void waitForAttributeChanged(String attributeName, Object newValue) throws Exception
+ {
+ Map<String, Object> nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl);
+ long limit = System.currentTimeMillis() + 5000;
+ while(!newValue.equals(nodeAttributes.get(attributeName)) && System.currentTimeMillis() < limit)
+ {
+ Thread.sleep(100l);
+ nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl);
+ }
+ assertEquals("Unexpected attribute " + attributeName, newValue, nodeAttributes.get(attributeName));
}
}