From b9a025557f13f69d742f62aa0f2c099cd506add9 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Thu, 6 Feb 2014 17:34:49 +0000 Subject: QPID-5409: Environment thread is now used to perform set designated primary/set electable group override/set node prioriy git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1565370 13f79535-47bb-0310-9956-ffa450edef68 --- .../replication/LocalReplicationNode.java | 6 +- .../replication/ReplicatedEnvironmentFacade.java | 99 ++++++++++++++++------ .../ReplicatedEnvironmentFacadeTest.java | 12 +-- .../store/berkeleydb/HAClusterManagementTest.java | 5 ++ .../store/berkeleydb/ReplicationNodeRestTest.java | 24 ++++-- 5 files changed, 108 insertions(+), 38 deletions(-) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java index e5e0f866fa..ef2c48463b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java @@ -284,7 +284,11 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication public boolean changeAttribute(final String name, final Object expected, final Object desired) { updateReplicatedEnvironmentFacade(name, desired); - return super.changeAttribute(name, expected, desired); + if (!ROLE.equals(name)) + { + return super.changeAttribute(name, expected, desired); + } + return false; } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index e32ded5dd5..dc0b1b2ff1 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -80,6 +80,7 @@ import com.sleepycat.je.rep.ReplicationConfig; import com.sleepycat.je.rep.ReplicationGroup; import com.sleepycat.je.rep.ReplicationMutableConfig; import com.sleepycat.je.rep.ReplicationNode; +import com.sleepycat.je.rep.RestartRequiredException; import com.sleepycat.je.rep.StateChangeEvent; import com.sleepycat.je.rep.StateChangeListener; import com.sleepycat.je.rep.util.DbPing; @@ -153,7 +154,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final String _prettyGroupNodeName; private final File _environmentDirectory; - private final ExecutorService _restartEnvironmentExecutor; + private final ExecutorService _environmentJobExecutor; private final ScheduledExecutorService _groupChangeExecutor; private final AtomicReference _state = new AtomicReference(State.OPENING); private final ConcurrentMap _databases = new ConcurrentHashMap(); @@ -183,7 +184,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan _coalescingSync = (Boolean)_replicationNode.getAttribute(COALESCING_SYNC); _prettyGroupNodeName = (String)_replicationNode.getAttribute(GROUP_NAME) + ":" + _replicationNode.getName(); - _restartEnvironmentExecutor = Executors.newFixedThreadPool(1, new DaemonThreadFactory("Environment-Starter:" + _prettyGroupNodeName)); + // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread + _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName)); _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); _remoteReplicationNodeFactory = remoteReplicationNodeFactory; @@ -224,7 +226,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName); } - _restartEnvironmentExecutor.shutdown(); + _environmentJobExecutor.shutdown(); _groupChangeExecutor.shutdown(); closeDatabases(); closeEnvironment(); @@ -239,8 +241,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public AMQStoreException handleDatabaseException(String contextMessage, final DatabaseException dbe) { - //TODO: restart environment if dbe instanceof MasterReplicaTransitionException - boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException); + boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException); if (restart) { if (_state.compareAndSet(State.OPEN, State.RESTARTING)) @@ -249,7 +250,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe); } - _restartEnvironmentExecutor.execute(new Runnable() + + _environmentJobExecutor.execute(new Runnable() { @Override public void run() @@ -436,20 +438,36 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return state.toString(); } + public void removeNodeFromGroup(final String nodeName) + { + createReplicationGroupAdmin().removeMember(nodeName); + } + public boolean isDesignatedPrimary() { return _environment.getRepMutableConfig().getDesignatedPrimary(); } - public void removeNodeFromGroup(final String nodeName) + public Future setDesignatedPrimary(final boolean isPrimary) { - createReplicationGroupAdmin().removeMember(nodeName); + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Submitting a job to set designated primary on " + _prettyGroupNodeName + " to " + isPrimary); + } + + return _environmentJobExecutor.submit(new Callable() + { + @Override + public Void call() + { + setDesignatedPrimaryInternal(isPrimary); + return null; + } + }); } - public void setDesignatedPrimary(final boolean isPrimary) throws AMQStoreException + private void setDesignatedPrimaryInternal(final boolean isPrimary) { - // TODO : we have a race if the RE is being restarted? - // if (restarting) put setDesignatedPrimary job in queue??? try { final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); @@ -461,10 +479,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.info("Node " + _prettyGroupNodeName + " successfully set designated primary : " + isPrimary); } } - catch (DatabaseException e) + catch (Exception e) { - // TODO: I am not sure about the exception handing here - throw handleDatabaseException("Cannot set designated primary", e); + LOGGER.error("Cannot set designated primary to " + isPrimary + " on node " + _prettyGroupNodeName, e); } } @@ -474,7 +491,25 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return repConfig.getNodePriority(); } - public void setPriority(int priority) throws AMQStoreException + public Future setPriority(final int priority) + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Submitting a job to set priority on " + _prettyGroupNodeName + " to " + priority); + } + + return _environmentJobExecutor.submit(new Callable() + { + @Override + public Void call() + { + setPriorityInternal(priority); + return null; + } + }); + } + + private void setPriorityInternal(int priority) { try { @@ -487,10 +522,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.debug("Node " + _prettyGroupNodeName + " priority has been changed to " + priority); } } - catch (DatabaseException e) + catch (Exception e) { - // TODO: I am not sure about the exception handing here - throw handleDatabaseException("Cannot set priority on " + _prettyGroupNodeName, e); + LOGGER.error("Cannot set priority to " + priority + " on node " + _prettyGroupNodeName, e); } } @@ -500,7 +534,25 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return repConfig.getElectableGroupSizeOverride(); } - public void setElectableGroupSizeOverride(int electableGroupOverride) throws AMQStoreException + public Future setElectableGroupSizeOverride(final int electableGroupOverride) + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Submitting a job to set electable group override on " + _prettyGroupNodeName + " to " + electableGroupOverride); + } + + return _environmentJobExecutor.submit(new Callable() + { + @Override + public Void call() + { + setElectableGroupSizeOverrideInternal(electableGroupOverride); + return null; + } + }); + } + + private void setElectableGroupSizeOverrideInternal(int electableGroupOverride) { try { @@ -513,10 +565,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.debug("Node " + _prettyGroupNodeName + " electable group size override has been changed to " + electableGroupOverride); } } - catch (DatabaseException e) + catch (Exception e) { - // TODO: I am not sure about the exception handing here - throw handleDatabaseException("Cannot set electable group size override on " + _prettyGroupNodeName, e); + LOGGER.error("Cannot set electable group size to " + electableGroupOverride + " on node " + _prettyGroupNodeName, e); } } @@ -666,7 +717,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - private void restartEnvironment(DatabaseException dbe) throws AMQStoreException + private void restartEnvironment(DatabaseException dbe) { LOGGER.info("Restarting environment"); @@ -828,7 +879,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private ReplicatedEnvironment createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig, final ReplicationConfig replicationConfig) { - Future environmentFuture = _restartEnvironmentExecutor.submit(new Callable(){ + Future environmentFuture = _environmentJobExecutor.submit(new Callable(){ @Override public ReplicatedEnvironment call() throws Exception { diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 45117b17c2..052341c810 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -221,8 +222,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { ReplicatedEnvironmentFacade facade = createMaster(); assertEquals("Unexpected priority", TEST_PRIORITY, facade.getPriority()); - - facade.setPriority(TEST_PRIORITY + 1); + Future future = facade.setPriority(TEST_PRIORITY + 1); + future.get(5, TimeUnit.SECONDS); assertEquals("Unexpected priority after change", TEST_PRIORITY + 1, facade.getPriority()); } @@ -230,16 +231,17 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { ReplicatedEnvironmentFacade master = createMaster(); assertEquals("Unexpected designated primary", TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); - master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY); + Future future = master.setDesignatedPrimary(!TEST_DESIGNATED_PRIMARY); + future.get(5, TimeUnit.SECONDS); assertEquals("Unexpected designated primary after change", !TEST_DESIGNATED_PRIMARY, master.isDesignatedPrimary()); } - public void testElectableGroupSizeOverride() throws Exception { ReplicatedEnvironmentFacade facade = createMaster(); assertEquals("Unexpected Electable Group Size Override", TEST_ELECTABLE_GROUP_OVERRIDE, facade.getElectableGroupSizeOverride()); - facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1); + Future future = facade.setElectableGroupSizeOverride(TEST_ELECTABLE_GROUP_OVERRIDE + 1); + future.get(5, TimeUnit.SECONDS); assertEquals("Unexpected Electable Group Size Override after change", TEST_ELECTABLE_GROUP_OVERRIDE + 1, facade.getElectableGroupSizeOverride()); } diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java index 52b873384d..9adc834f95 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java @@ -204,6 +204,11 @@ public class HAClusterManagementTest extends QpidBrokerTestCase final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPort); assertFalse("Unexpected designated primary before change", storeBean.getDesignatedPrimary()); storeBean.setDesignatedPrimary(true); + long limit = System.currentTimeMillis() + 5000; + while(!storeBean.getDesignatedPrimary() && System.currentTimeMillis() < limit) + { + Thread.sleep(100l); + } assertTrue("Unexpected designated primary after change", storeBean.getDesignatedPrimary()); } diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java index b70703c526..c757a5d99c 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java @@ -137,8 +137,7 @@ public class ReplicationNodeRestTest extends QpidRestTestCase } private void assertReplicationNodeSetAttribute(String attributeName, Object initialValue, - Object newValueBeforeHostActivation, Object newValueAfterHostActivation) throws IOException, JsonGenerationException, - JsonMappingException + Object newValueBeforeHostActivation, Object newValueAfterHostActivation) throws Exception { Map nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl); assertEquals("Unexpected " + attributeName + " after creation", initialValue, nodeAttributes.get(attributeName)); @@ -146,19 +145,28 @@ public class ReplicationNodeRestTest extends QpidRestTestCase int responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", Collections.singletonMap(attributeName, newValueBeforeHostActivation)); assertEquals("Unexpected response code for node " + attributeName + " update", 200, responseCode); - nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl); - assertEquals("Unexpected " + attributeName + " after update but before host activation", newValueBeforeHostActivation, nodeAttributes.get(attributeName)); + waitForAttributeChanged(attributeName, newValueBeforeHostActivation); responseCode = getRestTestHelper().submitRequest(_hostRestUrl, "PUT", Collections.singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE)); assertEquals("Unexpected response code for virtual host update status", 200, responseCode); - nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl); - assertEquals("Unexpected " + attributeName + " after host activation", newValueBeforeHostActivation, nodeAttributes.get(attributeName)); + waitForAttributeChanged(attributeName, newValueBeforeHostActivation); responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", Collections.singletonMap(attributeName, newValueAfterHostActivation)); assertEquals("Unexpected response code for node " + attributeName + " update", 200, responseCode); - nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl); - assertEquals("Unexpected " + attributeName + " after update after host activation", newValueAfterHostActivation, nodeAttributes.get(attributeName)); + waitForAttributeChanged(attributeName, newValueAfterHostActivation); + } + + private void waitForAttributeChanged(String attributeName, Object newValue) throws Exception + { + Map nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl); + long limit = System.currentTimeMillis() + 5000; + while(!newValue.equals(nodeAttributes.get(attributeName)) && System.currentTimeMillis() < limit) + { + Thread.sleep(100l); + nodeAttributes = getRestTestHelper().getJsonAsSingletonList(_nodeRestUrl); + } + assertEquals("Unexpected attribute " + attributeName, newValue, nodeAttributes.get(attributeName)); } } -- cgit v1.2.1