diff options
author | Keith Wall <kwall@apache.org> | 2014-09-23 06:54:51 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-09-23 06:54:51 +0000 |
commit | 64952d54c47cf518eb6905b9a5c4a0374b63b1e8 (patch) | |
tree | f1c0e302ea62711d6d0db80d9b7fa4b920d67774 | |
parent | 6422454ca27384f41d2b67d3a294c89d7279c062 (diff) | |
download | qpid-python-64952d54c47cf518eb6905b9a5c4a0374b63b1e8.tar.gz |
QPID-6102: [Java Broker] HA Prevent InsufficientReplica/InsufficientAckExceptions and other exceptions that require JE environment restart from causing Broker shutdown.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1626953 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 113 insertions, 0 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 4cd677586a..fa417981c7 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -90,6 +90,7 @@ import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.DaemonThreadFactory; public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener @@ -384,6 +385,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan if (restart) { tryToRestartEnvironment(dbe); + throw new ConnectionScopedRuntimeException(noMajority ? "Required number of nodes not reachable" : "Underlying JE environment is being restarted", dbe); } return dbe; } diff --git a/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java b/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java index 5701c8c9c8..9e850714b0 100644 --- a/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java +++ b/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java @@ -25,13 +25,18 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.TransactionRolledBackException; import com.sleepycat.je.Durability; import com.sleepycat.je.EnvironmentConfig; @@ -262,6 +267,112 @@ public class MultiNodeTest extends QpidBrokerTestCase _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); } + public void testTransferMasterWhilstMessagesInFlight() throws Exception + { + final Connection connection = getConnection(_positiveFailoverUrl); + connection.start(); + ((AMQConnection) connection).setConnectionListener(_failoverListener); + + final AtomicBoolean masterTransfered = new AtomicBoolean(false); + final AtomicBoolean keepRunning = new AtomicBoolean(true); + final AtomicReference<Exception> workerException = new AtomicReference<>(); + final CountDownLatch producedOneBefore = new CountDownLatch(1); + final CountDownLatch producedOneAfter = new CountDownLatch(1); + final CountDownLatch workerShutdown = new CountDownLatch(1); + + + Runnable producer = new Runnable() + { + @Override + public void run() + { + try + { + int count = 0; + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(getTestQueueName()); + MessageProducer producer = session.createProducer(destination); + session.createConsumer(destination).close(); + + while (keepRunning.get()) + { + String messageText = "message" + count; + try + { + Message message = session.createTextMessage(messageText); + producer.send(message); + session.commit(); + LOGGER.debug("Sent message " + count); + } + catch (TransactionRolledBackException trbe) + { + + } + catch(JMSException je) + { + + } + + producedOneBefore.countDown(); + + if (masterTransfered.get()) + { + producedOneAfter.countDown(); + } + count++; + } + } + catch (Exception e) + { + workerException.set(e); + } + finally + { + workerShutdown.countDown(); + } + } + + }; + + Thread backgroundWorker = new Thread(producer); + backgroundWorker.start(); + + boolean workerRunning = producedOneBefore.await(5000, TimeUnit.MILLISECONDS); + assertTrue(workerRunning); + + final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); + LOGGER.info("Active connection port " + activeBrokerPort); + + final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection); + LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort); + + _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA"); + Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort); + assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); + + _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + masterTransfered.set(true); + + _failoverListener.awaitFailoverCompletion(20000); + LOGGER.info("Failover has finished"); + + attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); + + _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); + + boolean producedMore = producedOneAfter.await(5000, TimeUnit.MILLISECONDS); + assertTrue("Should have successfully produced at least one message after transfer complete", producedMore); + + keepRunning.set(false); + boolean shutdown = workerShutdown.await(5000, TimeUnit.MILLISECONDS); + assertTrue("Worker thread should have shutdown", shutdown); + + backgroundWorker.join(5000); + assertNull(workerException.get()); + + } + public void testQuorumOverride() throws Exception { final Connection connection = getConnection(_positiveFailoverUrl); |