diff options
author | Andor Molnar <andor@apache.org> | 2018-10-12 10:25:27 +0200 |
---|---|---|
committer | Mate Szalay-Beko <mszalay@cloudera.com> | 2022-05-17 11:54:17 +0200 |
commit | 67285ce5bb209eb926b2850a0c37e90e1db13382 (patch) | |
tree | 15bfd8774e713e8ef771f4f993dde8bfd98e1f91 | |
parent | af44dabd425473ae57cba5e0deed4fc39de7e05c (diff) | |
download | zookeeper-67285ce5bb209eb926b2850a0c37e90e1db13382.tar.gz |
ZOOKEEPER-3161: Refactor QuorumPeerMainTest.java: move commonly used functions to base class
Move the following methods to QuorumPeerTestBase.java:
- tearDown()
- LaunchServers()
- waitForOne(), waitForAll()
- logStates()
Author: Andor Molnar <andor@apache.org>
Reviewers: andor@apache.org
Closes #659 from anmolnar/ZOOKEEPER-3161
(cherry picked from commit ee250f141678f79e9517bfc8913956199fad55bb)
2 files changed, 145 insertions, 146 deletions
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index a1eeaa3ac..aef5bd417 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -35,7 +35,6 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.nio.file.Paths; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -52,7 +51,6 @@ import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.PortAssignment; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; @@ -63,7 +61,6 @@ import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.Leader.Proposal; import org.apache.zookeeper.test.ClientBase; -import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -73,22 +70,6 @@ import org.junit.Test; */ public class QuorumPeerMainTest extends QuorumPeerTestBase { - private Servers servers; - private int numServers = 0; - - @After - public void tearDown() throws Exception { - if (servers == null || servers.mt == null) { - LOG.info("No servers to shutdown!"); - return; - } - for (int i = 0; i < numServers; i++) { - if (i < servers.mt.length) { - servers.mt[i].shutdown(); - } - } - } - /** * Verify the ability to start a cluster. */ @@ -454,132 +435,6 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { Assert.assertTrue("falseLeader never rejoins the quorum", foundFollowing); } - public static void waitForOne(ZooKeeper zk, States state) throws InterruptedException { - int iterations = ClientBase.CONNECTION_TIMEOUT / 500; - while (zk.getState() != state) { - if (iterations-- == 0) { - throw new RuntimeException("Waiting too long " + zk.getState() + " != " + state); - } - Thread.sleep(500); - } - } - - private void waitForAll(Servers servers, States state) throws InterruptedException { - waitForAll(servers.zk, state); - } - - public static void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException { - int iterations = ClientBase.CONNECTION_TIMEOUT / 1000; - boolean someoneNotConnected = true; - while (someoneNotConnected) { - if (iterations-- == 0) { - logStates(zks); - ClientBase.logAllStackTraces(); - throw new RuntimeException("Waiting too long"); - } - - someoneNotConnected = false; - for (ZooKeeper zk : zks) { - if (zk.getState() != state) { - someoneNotConnected = true; - break; - } - } - Thread.sleep(1000); - } - } - - public static void logStates(ZooKeeper[] zks) { - StringBuilder sbBuilder = new StringBuilder("Connection States: {"); - for (int i = 0; i < zks.length; i++) { - sbBuilder.append(i + " : " + zks[i].getState() + ", "); - } - sbBuilder.append('}'); - LOG.error(sbBuilder.toString()); - } - - // This class holds the servers and clients for those servers - private static class Servers { - MainThread mt[]; - ZooKeeper zk[]; - int[] clientPorts; - - public void shutDownAllServers() throws InterruptedException { - for (MainThread t: mt) { - t.shutdown(); - } - } - - public void restartAllServersAndClients(Watcher watcher) throws IOException, InterruptedException { - for (MainThread t : mt) { - if (!t.isAlive()) { - t.start(); - } - } - for (int i = 0; i < zk.length; i++) { - restartClient(i, watcher); - } - } - - public void restartClient(int clientIndex, Watcher watcher) throws IOException, InterruptedException { - if (zk[clientIndex] != null) { - zk[clientIndex].close(); - } - zk[clientIndex] = new ZooKeeper("127.0.0.1:" + clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher); - } - - public int findLeader() { - for (int i = 0; i < mt.length; i++) { - if (mt[i].main.quorumPeer.leader != null) { - return i; - } - } - return -1; - } - } - - - private Servers LaunchServers(int numServers) throws IOException, InterruptedException { - return LaunchServers(numServers, null); - } - - /** - * This is a helper function for launching a set of servers - * - * @param numServers the number of servers - * @param tickTime A ticktime to pass to MainThread - * @return - * @throws IOException - * @throws InterruptedException - */ - private Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException { - int SERVER_COUNT = numServers; - Servers svrs = new Servers(); - svrs.clientPorts = new int[SERVER_COUNT]; - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < SERVER_COUNT; i++) { - svrs.clientPorts[i] = PortAssignment.unique(); - sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+svrs.clientPorts[i]+"\n"); - } - String quorumCfgSection = sb.toString(); - - svrs.mt = new MainThread[SERVER_COUNT]; - svrs.zk = new ZooKeeper[SERVER_COUNT]; - for(int i = 0; i < SERVER_COUNT; i++) { - if (tickTime != null) { - svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, new HashMap<String, String>(), tickTime); - } else { - svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection); - } - svrs.mt[i].start(); - svrs.restartClient(i, this); - } - - waitForAll(svrs, States.CONNECTED); - - return svrs; - } - /** * Verify handling of bad quorum address */ @@ -1216,7 +1071,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase { } } } - + private Proposal findProposalOfType(Map<Long, Proposal> proposals, int type) { for (Proposal proposal : proposals.values()) { if (proposal.request.getHdr().getType() == type) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java index ffc00f39a..22f989314 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java @@ -32,6 +32,9 @@ import java.util.Map.Entry; import java.util.Set; import java.util.Properties; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZooKeeper; +import org.junit.After; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.WatchedEvent; @@ -53,6 +56,22 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher { public static final int TIMEOUT = 5000; + protected Servers servers; + protected int numServers = 0; + + @After + public void tearDown() throws Exception { + if (servers == null || servers.mt == null) { + LOG.info("No servers to shutdown!"); + return; + } + for (int i = 0; i < numServers; i++) { + if (i < servers.mt.length) { + servers.mt[i].shutdown(); + } + } + } + public void process(WatchedEvent event) { // ignore for this test } @@ -390,4 +409,129 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher { } } + + // This class holds the servers and clients for those servers + protected static class Servers { + MainThread mt[]; + ZooKeeper zk[]; + int[] clientPorts; + + public void shutDownAllServers() throws InterruptedException { + for (MainThread t: mt) { + t.shutdown(); + } + } + + public void restartAllServersAndClients(Watcher watcher) throws IOException, InterruptedException { + for (MainThread t : mt) { + if (!t.isAlive()) { + t.start(); + } + } + for (int i = 0; i < zk.length; i++) { + restartClient(i, watcher); + } + } + + public void restartClient(int clientIndex, Watcher watcher) throws IOException, InterruptedException { + if (zk[clientIndex] != null) { + zk[clientIndex].close(); + } + zk[clientIndex] = new ZooKeeper("127.0.0.1:" + clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher); + } + + public int findLeader() { + for (int i = 0; i < mt.length; i++) { + if (mt[i].main.quorumPeer.leader != null) { + return i; + } + } + return -1; + } + } + + protected Servers LaunchServers(int numServers) throws IOException, InterruptedException { + return LaunchServers(numServers, null); + } + + /** * This is a helper function for launching a set of servers + * + * @param numServers the number of servers + * @param tickTime A ticktime to pass to MainThread + * @return + * @throws IOException + * @throws InterruptedException + */ + protected Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException { + int SERVER_COUNT = numServers; + QuorumPeerMainTest.Servers svrs = new QuorumPeerMainTest.Servers(); + svrs.clientPorts = new int[SERVER_COUNT]; + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < SERVER_COUNT; i++) { + svrs.clientPorts[i] = PortAssignment.unique(); + sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+svrs.clientPorts[i]+"\n"); + } + String quorumCfgSection = sb.toString(); + + svrs.mt = new MainThread[SERVER_COUNT]; + svrs.zk = new ZooKeeper[SERVER_COUNT]; + for(int i = 0; i < SERVER_COUNT; i++) { + if (tickTime != null) { + svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, new HashMap<String, String>(), tickTime); + } else { + svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection); + } + svrs.mt[i].start(); + svrs.restartClient(i, this); + } + + waitForAll(svrs, ZooKeeper.States.CONNECTED); + + return svrs; + } + + public static void waitForOne(ZooKeeper zk, ZooKeeper.States state) throws InterruptedException { + int iterations = ClientBase.CONNECTION_TIMEOUT / 500; + while (zk.getState() != state) { + if (iterations-- == 0) { + throw new RuntimeException("Waiting too long " + zk.getState() + " != " + state); + } + Thread.sleep(500); + } + } + + protected void waitForAll(Servers servers, ZooKeeper.States state) throws InterruptedException { + waitForAll(servers.zk, state); + } + + public static void waitForAll(ZooKeeper[] zks, ZooKeeper.States state) throws InterruptedException { + int iterations = ClientBase.CONNECTION_TIMEOUT / 1000; + boolean someoneNotConnected = true; + while (someoneNotConnected) { + if (iterations-- == 0) { + logStates(zks); + ClientBase.logAllStackTraces(); + throw new RuntimeException("Waiting too long"); + } + + someoneNotConnected = false; + for (ZooKeeper zk : zks) { + if (zk.getState() != state) { + someoneNotConnected = true; + break; + } + } + Thread.sleep(1000); + } + } + + public static void logStates(ZooKeeper[] zks) { + StringBuilder sbBuilder = new StringBuilder("Connection States: {"); + for (int i = 0; i < zks.length; i++) { + sbBuilder.append(i + " : " + zks[i].getState() + ", "); + } + sbBuilder.append('}'); + LOG.error(sbBuilder.toString()); + } + } |