summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-09-23 06:54:51 +0000
committerKeith Wall <kwall@apache.org>2014-09-23 06:54:51 +0000
commit64952d54c47cf518eb6905b9a5c4a0374b63b1e8 (patch)
treef1c0e302ea62711d6d0db80d9b7fa4b920d67774
parent6422454ca27384f41d2b67d3a294c89d7279c062 (diff)
downloadqpid-python-64952d54c47cf518eb6905b9a5c4a0374b63b1e8.tar.gz
QPID-6102: [Java Broker] HA Prevent InsufficientReplica/InsufficientAckExceptions and other exceptions that require JE environment restart from causing Broker shutdown.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1626953 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java2
-rw-r--r--java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java111
2 files changed, 113 insertions, 0 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 4cd677586a..fa417981c7 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -90,6 +90,7 @@ import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.DaemonThreadFactory;
public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
@@ -384,6 +385,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
if (restart)
{
tryToRestartEnvironment(dbe);
+ throw new ConnectionScopedRuntimeException(noMajority ? "Required number of nodes not reachable" : "Underlying JE environment is being restarted", dbe);
}
return dbe;
}
diff --git a/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java b/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
index 5701c8c9c8..9e850714b0 100644
--- a/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
+++ b/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
@@ -25,13 +25,18 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TransactionRolledBackException;
import com.sleepycat.je.Durability;
import com.sleepycat.je.EnvironmentConfig;
@@ -262,6 +267,112 @@ public class MultiNodeTest extends QpidBrokerTestCase
_groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
}
+ public void testTransferMasterWhilstMessagesInFlight() throws Exception
+ {
+ final Connection connection = getConnection(_positiveFailoverUrl);
+ connection.start();
+ ((AMQConnection) connection).setConnectionListener(_failoverListener);
+
+ final AtomicBoolean masterTransfered = new AtomicBoolean(false);
+ final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ final AtomicReference<Exception> workerException = new AtomicReference<>();
+ final CountDownLatch producedOneBefore = new CountDownLatch(1);
+ final CountDownLatch producedOneAfter = new CountDownLatch(1);
+ final CountDownLatch workerShutdown = new CountDownLatch(1);
+
+
+ Runnable producer = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ int count = 0;
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageProducer producer = session.createProducer(destination);
+ session.createConsumer(destination).close();
+
+ while (keepRunning.get())
+ {
+ String messageText = "message" + count;
+ try
+ {
+ Message message = session.createTextMessage(messageText);
+ producer.send(message);
+ session.commit();
+ LOGGER.debug("Sent message " + count);
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+
+ }
+ catch(JMSException je)
+ {
+
+ }
+
+ producedOneBefore.countDown();
+
+ if (masterTransfered.get())
+ {
+ producedOneAfter.countDown();
+ }
+ count++;
+ }
+ }
+ catch (Exception e)
+ {
+ workerException.set(e);
+ }
+ finally
+ {
+ workerShutdown.countDown();
+ }
+ }
+
+ };
+
+ Thread backgroundWorker = new Thread(producer);
+ backgroundWorker.start();
+
+ boolean workerRunning = producedOneBefore.await(5000, TimeUnit.MILLISECONDS);
+ assertTrue(workerRunning);
+
+ final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
+ LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+
+ _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA");
+ Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+ _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+ masterTransfered.set(true);
+
+ _failoverListener.awaitFailoverCompletion(20000);
+ LOGGER.info("Failover has finished");
+
+ attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+ _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+
+ boolean producedMore = producedOneAfter.await(5000, TimeUnit.MILLISECONDS);
+ assertTrue("Should have successfully produced at least one message after transfer complete", producedMore);
+
+ keepRunning.set(false);
+ boolean shutdown = workerShutdown.await(5000, TimeUnit.MILLISECONDS);
+ assertTrue("Worker thread should have shutdown", shutdown);
+
+ backgroundWorker.join(5000);
+ assertNull(workerException.get());
+
+ }
+
public void testQuorumOverride() throws Exception
{
final Connection connection = getConnection(_positiveFailoverUrl);