summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndor Molnar <andor@apache.org>2018-10-12 10:25:27 +0200
committerMate Szalay-Beko <mszalay@cloudera.com>2022-05-17 11:54:17 +0200
commit67285ce5bb209eb926b2850a0c37e90e1db13382 (patch)
tree15bfd8774e713e8ef771f4f993dde8bfd98e1f91
parentaf44dabd425473ae57cba5e0deed4fc39de7e05c (diff)
downloadzookeeper-67285ce5bb209eb926b2850a0c37e90e1db13382.tar.gz
ZOOKEEPER-3161: Refactor QuorumPeerMainTest.java: move commonly used functions to base class
Move the following methods to QuorumPeerTestBase.java: - tearDown() - LaunchServers() - waitForOne(), waitForAll() - logStates() Author: Andor Molnar <andor@apache.org> Reviewers: andor@apache.org Closes #659 from anmolnar/ZOOKEEPER-3161 (cherry picked from commit ee250f141678f79e9517bfc8913956199fad55bb)
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java147
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java144
2 files changed, 145 insertions, 146 deletions
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index a1eeaa3ac..aef5bd417 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -35,7 +35,6 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -52,7 +51,6 @@ import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
@@ -63,7 +61,6 @@ import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.test.ClientBase;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -73,22 +70,6 @@ import org.junit.Test;
*/
public class QuorumPeerMainTest extends QuorumPeerTestBase {
- private Servers servers;
- private int numServers = 0;
-
- @After
- public void tearDown() throws Exception {
- if (servers == null || servers.mt == null) {
- LOG.info("No servers to shutdown!");
- return;
- }
- for (int i = 0; i < numServers; i++) {
- if (i < servers.mt.length) {
- servers.mt[i].shutdown();
- }
- }
- }
-
/**
* Verify the ability to start a cluster.
*/
@@ -454,132 +435,6 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
Assert.assertTrue("falseLeader never rejoins the quorum", foundFollowing);
}
- public static void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
- int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
- while (zk.getState() != state) {
- if (iterations-- == 0) {
- throw new RuntimeException("Waiting too long " + zk.getState() + " != " + state);
- }
- Thread.sleep(500);
- }
- }
-
- private void waitForAll(Servers servers, States state) throws InterruptedException {
- waitForAll(servers.zk, state);
- }
-
- public static void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
- int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
- boolean someoneNotConnected = true;
- while (someoneNotConnected) {
- if (iterations-- == 0) {
- logStates(zks);
- ClientBase.logAllStackTraces();
- throw new RuntimeException("Waiting too long");
- }
-
- someoneNotConnected = false;
- for (ZooKeeper zk : zks) {
- if (zk.getState() != state) {
- someoneNotConnected = true;
- break;
- }
- }
- Thread.sleep(1000);
- }
- }
-
- public static void logStates(ZooKeeper[] zks) {
- StringBuilder sbBuilder = new StringBuilder("Connection States: {");
- for (int i = 0; i < zks.length; i++) {
- sbBuilder.append(i + " : " + zks[i].getState() + ", ");
- }
- sbBuilder.append('}');
- LOG.error(sbBuilder.toString());
- }
-
- // This class holds the servers and clients for those servers
- private static class Servers {
- MainThread mt[];
- ZooKeeper zk[];
- int[] clientPorts;
-
- public void shutDownAllServers() throws InterruptedException {
- for (MainThread t: mt) {
- t.shutdown();
- }
- }
-
- public void restartAllServersAndClients(Watcher watcher) throws IOException, InterruptedException {
- for (MainThread t : mt) {
- if (!t.isAlive()) {
- t.start();
- }
- }
- for (int i = 0; i < zk.length; i++) {
- restartClient(i, watcher);
- }
- }
-
- public void restartClient(int clientIndex, Watcher watcher) throws IOException, InterruptedException {
- if (zk[clientIndex] != null) {
- zk[clientIndex].close();
- }
- zk[clientIndex] = new ZooKeeper("127.0.0.1:" + clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher);
- }
-
- public int findLeader() {
- for (int i = 0; i < mt.length; i++) {
- if (mt[i].main.quorumPeer.leader != null) {
- return i;
- }
- }
- return -1;
- }
- }
-
-
- private Servers LaunchServers(int numServers) throws IOException, InterruptedException {
- return LaunchServers(numServers, null);
- }
-
- /**
- * This is a helper function for launching a set of servers
- *
- * @param numServers the number of servers
- * @param tickTime A ticktime to pass to MainThread
- * @return
- * @throws IOException
- * @throws InterruptedException
- */
- private Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException {
- int SERVER_COUNT = numServers;
- Servers svrs = new Servers();
- svrs.clientPorts = new int[SERVER_COUNT];
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < SERVER_COUNT; i++) {
- svrs.clientPorts[i] = PortAssignment.unique();
- sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+svrs.clientPorts[i]+"\n");
- }
- String quorumCfgSection = sb.toString();
-
- svrs.mt = new MainThread[SERVER_COUNT];
- svrs.zk = new ZooKeeper[SERVER_COUNT];
- for(int i = 0; i < SERVER_COUNT; i++) {
- if (tickTime != null) {
- svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, new HashMap<String, String>(), tickTime);
- } else {
- svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection);
- }
- svrs.mt[i].start();
- svrs.restartClient(i, this);
- }
-
- waitForAll(svrs, States.CONNECTED);
-
- return svrs;
- }
-
/**
* Verify handling of bad quorum address
*/
@@ -1216,7 +1071,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
}
}
}
-
+
private Proposal findProposalOfType(Map<Long, Proposal> proposals, int type) {
for (Proposal proposal : proposals.values()) {
if (proposal.request.getHdr().getType() == type) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
index ffc00f39a..22f989314 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
@@ -32,6 +32,9 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.Properties;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.WatchedEvent;
@@ -53,6 +56,22 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
public static final int TIMEOUT = 5000;
+ protected Servers servers;
+ protected int numServers = 0;
+
+ @After
+ public void tearDown() throws Exception {
+ if (servers == null || servers.mt == null) {
+ LOG.info("No servers to shutdown!");
+ return;
+ }
+ for (int i = 0; i < numServers; i++) {
+ if (i < servers.mt.length) {
+ servers.mt[i].shutdown();
+ }
+ }
+ }
+
public void process(WatchedEvent event) {
// ignore for this test
}
@@ -390,4 +409,129 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
}
}
+
+ // This class holds the servers and clients for those servers
+ protected static class Servers {
+ MainThread mt[];
+ ZooKeeper zk[];
+ int[] clientPorts;
+
+ public void shutDownAllServers() throws InterruptedException {
+ for (MainThread t: mt) {
+ t.shutdown();
+ }
+ }
+
+ public void restartAllServersAndClients(Watcher watcher) throws IOException, InterruptedException {
+ for (MainThread t : mt) {
+ if (!t.isAlive()) {
+ t.start();
+ }
+ }
+ for (int i = 0; i < zk.length; i++) {
+ restartClient(i, watcher);
+ }
+ }
+
+ public void restartClient(int clientIndex, Watcher watcher) throws IOException, InterruptedException {
+ if (zk[clientIndex] != null) {
+ zk[clientIndex].close();
+ }
+ zk[clientIndex] = new ZooKeeper("127.0.0.1:" + clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher);
+ }
+
+ public int findLeader() {
+ for (int i = 0; i < mt.length; i++) {
+ if (mt[i].main.quorumPeer.leader != null) {
+ return i;
+ }
+ }
+ return -1;
+ }
+ }
+
+ protected Servers LaunchServers(int numServers) throws IOException, InterruptedException {
+ return LaunchServers(numServers, null);
+ }
+
+ /** * This is a helper function for launching a set of servers
+ *
+ * @param numServers the number of servers
+ * @param tickTime A ticktime to pass to MainThread
+ * @return
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException {
+ int SERVER_COUNT = numServers;
+ QuorumPeerMainTest.Servers svrs = new QuorumPeerMainTest.Servers();
+ svrs.clientPorts = new int[SERVER_COUNT];
+ StringBuilder sb = new StringBuilder();
+ for(int i = 0; i < SERVER_COUNT; i++) {
+ svrs.clientPorts[i] = PortAssignment.unique();
+ sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+svrs.clientPorts[i]+"\n");
+ }
+ String quorumCfgSection = sb.toString();
+
+ svrs.mt = new MainThread[SERVER_COUNT];
+ svrs.zk = new ZooKeeper[SERVER_COUNT];
+ for(int i = 0; i < SERVER_COUNT; i++) {
+ if (tickTime != null) {
+ svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, new HashMap<String, String>(), tickTime);
+ } else {
+ svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection);
+ }
+ svrs.mt[i].start();
+ svrs.restartClient(i, this);
+ }
+
+ waitForAll(svrs, ZooKeeper.States.CONNECTED);
+
+ return svrs;
+ }
+
+ public static void waitForOne(ZooKeeper zk, ZooKeeper.States state) throws InterruptedException {
+ int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
+ while (zk.getState() != state) {
+ if (iterations-- == 0) {
+ throw new RuntimeException("Waiting too long " + zk.getState() + " != " + state);
+ }
+ Thread.sleep(500);
+ }
+ }
+
+ protected void waitForAll(Servers servers, ZooKeeper.States state) throws InterruptedException {
+ waitForAll(servers.zk, state);
+ }
+
+ public static void waitForAll(ZooKeeper[] zks, ZooKeeper.States state) throws InterruptedException {
+ int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
+ boolean someoneNotConnected = true;
+ while (someoneNotConnected) {
+ if (iterations-- == 0) {
+ logStates(zks);
+ ClientBase.logAllStackTraces();
+ throw new RuntimeException("Waiting too long");
+ }
+
+ someoneNotConnected = false;
+ for (ZooKeeper zk : zks) {
+ if (zk.getState() != state) {
+ someoneNotConnected = true;
+ break;
+ }
+ }
+ Thread.sleep(1000);
+ }
+ }
+
+ public static void logStates(ZooKeeper[] zks) {
+ StringBuilder sbBuilder = new StringBuilder("Connection States: {");
+ for (int i = 0; i < zks.length; i++) {
+ sbBuilder.append(i + " : " + zks[i].getState() + ", ");
+ }
+ sbBuilder.append('}');
+ LOG.error(sbBuilder.toString());
+ }
+
}