summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryinfangxi <yinfangxi@kuaishou.com>2020-03-02 18:12:30 -0800
committerMate Szalay-Beko <mszalay@cloudera.com>2022-05-17 10:59:38 +0200
commitaf44dabd425473ae57cba5e0deed4fc39de7e05c (patch)
tree76c8826909e27d6310a00089e8a5edce34539b9d
parentaa20b0e80ab9afc9043f716a06b1c862a6125092 (diff)
downloadzookeeper-af44dabd425473ae57cba5e0deed4fc39de7e05c.tar.gz
ZOOKEEPER-3706: ZooKeeper.close() would leak SendThread when the netw…
…ork is broken - add unit test to verify the bug - bypass the SendThread.startConnect() by throw RuntimeExcepth if state.isAlive is false Author: Fangxi Yin <yinfangxikuaishou.com> Author: yinfangxi <yinfangxi@kuaishou.com> Reviewers: Michael Han <hanm@apache.org>, Enrico Olivelli <eolivelli@gmail.com>, maoling Closes #1235 from yfxhust/ZOOKEEPER-3706
-rw-r--r--zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java69
-rw-r--r--zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java359
2 files changed, 403 insertions, 25 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index 5341f5761..3449a8921 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -178,7 +178,7 @@ public class ClientCnxn {
* operation)
*/
private volatile boolean closing = false;
-
+
/**
* A set of ZooKeeper hosts this client could connect to.
*/
@@ -379,11 +379,11 @@ public class ClientCnxn {
* @param canBeReadOnly
* whether the connection is allowed to go to read-only
* mode in case of partitioning
- * @throws IOException
+ * @throws IOException in cases of broken network
*/
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
- long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
+ long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
@@ -649,7 +649,7 @@ public class ClientCnxn {
.substring(chrootPath.length())), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
- }
+ }
} else if (p.response instanceof MultiResponse) {
MultiCallback cb = (MultiCallback) p.cb;
MultiResponse rsp = (MultiResponse) p.response;
@@ -735,6 +735,11 @@ public class ClientCnxn {
eventThread.queueCallback(cb, rc, path, ctx);
}
+ // for test only
+ protected void onConnecting(InetSocketAddress addr) {
+
+ }
+
private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
@@ -764,7 +769,7 @@ public class ClientCnxn {
public EndOfStreamException(String msg) {
super(msg);
}
-
+
@Override
public String toString() {
return "EndOfStreamException: " + getMessage();
@@ -778,7 +783,7 @@ public class ClientCnxn {
super(msg);
}
}
-
+
private static class SessionExpiredException extends IOException {
private static final long serialVersionUID = -1388816932076193249L;
@@ -826,10 +831,10 @@ public class ClientCnxn {
return;
}
if (replyHdr.getXid() == -4) {
- // -4 is the xid for AuthPacket
+ // -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
- state = States.AUTH_FAILED;
- eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
+ changeZkState(States.AUTH_FAILED);
+ eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
eventThread.queueEventOfDeath();
}
@@ -927,9 +932,9 @@ public class ClientCnxn {
}
}
- SendThread(ClientCnxnSocket clientCnxnSocket) {
+ SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException {
super(makeThreadName("-SendThread()"));
- state = States.CONNECTING;
+ changeZkState(States.CONNECTING);
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
@@ -940,13 +945,22 @@ public class ClientCnxn {
// Runnable
/**
* Used by ClientCnxnSocket
- *
+ *
* @return
*/
- ZooKeeper.States getZkState() {
+ synchronized ZooKeeper.States getZkState() {
return state;
}
+ synchronized void changeZkState(ZooKeeper.States newState) throws IOException {
+ if (!state.isAlive() && newState == States.CONNECTING) {
+ throw new IOException(
+ "Connection has already been closed and reconnection is not allowed");
+ }
+ // It's safer to place state modification at the end.
+ state = newState;
+ }
+
ClientCnxnSocket getClientCnxnSocket() {
return clientCnxnSocket;
}
@@ -1073,7 +1087,7 @@ public class ClientCnxn {
LOG.warn("Unexpected exception", e);
}
}
- state = States.CONNECTING;
+ changeZkState(States.CONNECTING);
String hostPort = addr.getHostString() + ":" + addr.getPort();
MDC.put("myid", hostPort);
@@ -1136,6 +1150,7 @@ public class ClientCnxn {
} else {
serverAddress = hostProvider.next(1000);
}
+ onConnecting(serverAddress);
startConnect(serverAddress);
// Update now to start the connection timer right after we make a connection attempt
clientCnxnSocket.updateNow();
@@ -1150,8 +1165,8 @@ public class ClientCnxn {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
- LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
- state = States.AUTH_FAILED;
+ LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
+ changeZkState(States.AUTH_FAILED);
sendAuthEvent = true;
}
}
@@ -1159,7 +1174,7 @@ public class ClientCnxn {
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
- state = States.AUTH_FAILED;
+ changeZkState(States.AUTH_FAILED);
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
@@ -1181,7 +1196,7 @@ public class ClientCnxn {
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
-
+
if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
@@ -1194,8 +1209,8 @@ public class ClientCnxn {
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
- //also make sure not to send too many pings when readTimeout is small
- int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
+ //also make sure not to send too many pings when readTimeout is small
+ int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
@@ -1363,7 +1378,7 @@ public class ClientCnxn {
/**
* Callback invoked by the ClientCnxnSocket once a connection has been
* established.
- *
+ *
* @param _negotiatedSessionTimeout
* @param _sessionId
* @param _sessionPasswd
@@ -1374,7 +1389,7 @@ public class ClientCnxn {
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
- state = States.CLOSED;
+ changeZkState(States.CLOSED);
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
@@ -1395,8 +1410,7 @@ public class ClientCnxn {
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
- state = (isRO) ?
- States.CONNECTEDREADONLY : States.CONNECTED;
+ changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED);
seenRwServerBefore |= !isRO;
LOG.info("Session establishment complete on server "
+ clientCnxnSocket.getRemoteSocketAddress()
@@ -1411,7 +1425,12 @@ public class ClientCnxn {
}
void close() {
- state = States.CLOSED;
+ try {
+ changeZkState(States.CLOSED);
+ } catch (IOException e) {
+ LOG.warn("Connection close fails when migrates state from {} to CLOSED",
+ getZkState());
+ }
clientCnxnSocket.onClosing();
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
new file mode 100644
index 000000000..50c93361e
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.client.HostProvider;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClientCnxnSocketFragilityTest extends QuorumPeerTestBase {
+
+ private static final int SERVER_COUNT = 3;
+
+ private static final int SESSION_TIMEOUT = 40000;
+
+ public static final int CONNECTION_TIMEOUT = 30000;
+
+ private final UnsafeCoordinator unsafeCoordinator = new UnsafeCoordinator();
+
+ private volatile CustomZooKeeper zk = null;
+
+ private volatile FragileClientCnxnSocketNIO socket = null;
+
+ private volatile CustomClientCnxn cnxn = null;
+
+ private String getCxnString(int[] clientPorts) {
+ StringBuffer hostPortBuffer = new StringBuffer();
+ for (int i = 0; i < clientPorts.length; i++) {
+ hostPortBuffer.append("127.0.0.1:");
+ hostPortBuffer.append(clientPorts[i]);
+ if (i != (clientPorts.length - 1)) {
+ hostPortBuffer.append(',');
+ }
+ }
+ return hostPortBuffer.toString();
+ }
+
+ private void closeZookeeper(ZooKeeper zk) {
+ Executors.newSingleThreadExecutor().submit(() -> {
+ try {
+ LOG.info("closeZookeeper is fired");
+ zk.close();
+ } catch (InterruptedException e) {
+ }
+ });
+ }
+
+ @Test
+ public void testClientCnxnSocketFragility() throws Exception {
+ System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+ FragileClientCnxnSocketNIO.class.getName());
+ System.setProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, "1000");
+ final int[] clientPorts = new int[SERVER_COUNT];
+ StringBuilder sb = new StringBuilder();
+ String server;
+
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":"
+ + PortAssignment.unique() + ":participant;127.0.0.1:" + clientPorts[i];
+ sb.append(server + "\n");
+ }
+ String currentQuorumCfgSection = sb.toString();
+ MainThread[] mt = new MainThread[SERVER_COUNT];
+
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false);
+ mt[i].start();
+ }
+
+ // Ensure server started
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ Assert.assertTrue("waiting for server " + i + " being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT));
+ }
+ String path = "/testClientCnxnSocketFragility";
+ String data = "balabala";
+ ClientWatcher watcher = new ClientWatcher();
+ zk = new CustomZooKeeper(getCxnString(clientPorts), SESSION_TIMEOUT, watcher);
+ watcher.watchFor(zk);
+
+ // Let's see some successful operations
+ zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Assert.assertEquals(new String(zk.getData(path, false, new Stat())), data);
+ Assert.assertTrue(!watcher.isSessionExpired());
+
+ // Let's make a broken operation
+ socket.mute();
+ boolean catchKeeperException = false;
+ try {
+ zk.getData(path, false, new Stat());
+ } catch (KeeperException e) {
+ catchKeeperException = true;
+ Assert.assertFalse(e instanceof KeeperException.SessionExpiredException);
+ }
+ socket.unmute();
+ Assert.assertTrue(catchKeeperException);
+ Assert.assertTrue(!watcher.isSessionExpired());
+
+ GetDataRetryForeverBackgroundTask retryForeverGetData =
+ new GetDataRetryForeverBackgroundTask(zk, path);
+ retryForeverGetData.startTask();
+ // Let's make a broken network
+ socket.mute();
+
+ // Let's attempt to close ZooKeeper
+ cnxn.attemptClose();
+
+ // Wait some time to expect continuous reconnecting.
+ // We try to make reconnecting hit the unsafe region.
+ cnxn.waitUntilHitUnsafeRegion();
+
+ // close zk with timeout 1000 milli seconds
+ closeZookeeper(zk);
+ TimeUnit.MILLISECONDS.sleep(3000);
+
+ // Since we already close zookeeper, we expect that the zk should not be alive.
+ Assert.assertTrue(!zk.isAlive());
+ Assert.assertTrue(!watcher.isSessionExpired());
+
+ retryForeverGetData.syncCloseTask();
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ mt[i].shutdown();
+ }
+ }
+
+ class GetDataRetryForeverBackgroundTask extends Thread {
+ private volatile boolean alive;
+ private final CustomZooKeeper zk;
+ private final String path;
+
+ GetDataRetryForeverBackgroundTask(CustomZooKeeper zk, String path) {
+ this.alive = false;
+ this.zk = zk;
+ this.path = path;
+ // marked as daemon to avoid exhausting CPU
+ setDaemon(true);
+ }
+
+ void startTask() {
+ alive = true;
+ start();
+ }
+
+ void syncCloseTask() throws InterruptedException {
+ alive = false;
+ join();
+ }
+
+ @Override
+ public void run() {
+ while (alive) {
+ try {
+ zk.getData(path, false, new Stat());
+ // sleep for a while to avoid exhausting CPU
+ TimeUnit.MILLISECONDS.sleep(500);
+ } catch (Exception e) {
+ LOG.info("zookeeper getData failed on path {}", path);
+ }
+ }
+ }
+ }
+
+ public static class FragileClientCnxnSocketNIO extends ClientCnxnSocketNIO {
+
+ private volatile boolean mute;
+
+ public FragileClientCnxnSocketNIO(ZKClientConfig clientConfig) throws IOException {
+ super(clientConfig);
+ mute = false;
+ }
+
+ synchronized void mute() {
+ if (!mute) {
+ LOG.info("Fire socket mute");
+ mute = true;
+ }
+ }
+
+ synchronized void unmute() {
+ if (mute) {
+ LOG.info("Fire socket unmute");
+ mute = false;
+ }
+ }
+
+ @Override
+ void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
+ throws IOException, InterruptedException {
+ if (mute) {
+ throw new IOException("Socket is mute");
+ }
+ super.doTransport(waitTimeOut, pendingQueue, cnxn);
+ }
+
+ @Override
+ void connect(InetSocketAddress addr) throws IOException {
+ if (mute) {
+ throw new IOException("Socket is mute");
+ }
+ super.connect(addr);
+ }
+ }
+
+ class ClientWatcher implements Watcher {
+
+ private ZooKeeper zk;
+
+ private boolean sessionExpired = false;
+
+ void watchFor(ZooKeeper zk) {
+ this.zk = zk;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ LOG.info("Watcher got {}", event);
+ if (event.getState() == KeeperState.Expired) {
+ sessionExpired = true;
+ }
+ }
+
+ boolean isSessionExpired() {
+ return sessionExpired;
+ }
+ }
+
+ // Coordinate to construct the risky scenario.
+ class UnsafeCoordinator {
+
+ private CountDownLatch syncLatch = new CountDownLatch(2);
+
+ void sync(boolean closing) {
+ LOG.info("Attempt to sync with {}", closing);
+ if (closing) {
+ syncLatch.countDown();
+ try {
+ syncLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ class CustomClientCnxn extends ClientCnxn {
+
+ private volatile boolean closing = false;
+
+ private volatile boolean hitUnsafeRegion = false;
+
+ public CustomClientCnxn(
+ String chrootPath,
+ HostProvider hostProvider,
+ int sessionTimeout,
+ ZooKeeper zooKeeper,
+ ClientWatchManager watcher,
+ ClientCnxnSocket clientCnxnSocket,
+ boolean canBeReadOnly) throws IOException {
+ super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly);
+ }
+
+ void attemptClose() {
+ closing = true;
+ }
+
+ void waitUntilHitUnsafeRegion() {
+ while (!hitUnsafeRegion) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ @Override
+ protected void onConnecting(InetSocketAddress addr) {
+ if (closing) {
+ LOG.info("Attempt to connnecting {} {} {}", addr, closing, state);
+ ///////// Unsafe Region ////////
+ // Slow down and zoom out the unsafe point to make risk
+ // The unsafe point is that startConnect happens after sendThread.close
+ hitUnsafeRegion = true;
+ unsafeCoordinator.sync(closing);
+ ////////////////////////////////
+ }
+ }
+
+ @Override
+ public void disconnect() {
+ Assert.assertTrue(closing);
+ LOG.info("Attempt to disconnecting client for session: 0x{} {} {}", Long.toHexString(getSessionId()), closing, state);
+ sendThread.close();
+ ///////// Unsafe Region ////////
+ unsafeCoordinator.sync(closing);
+ ////////////////////////////////
+ try {
+ sendThread.join();
+ } catch (InterruptedException ex) {
+ LOG.warn("Got interrupted while waiting for the sender thread to close", ex);
+ }
+ eventThread.queueEventOfDeath();
+ }
+ }
+
+ class CustomZooKeeper extends ZooKeeper {
+
+ public CustomZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
+ super(connectString, sessionTimeout, watcher);
+ }
+
+ public boolean isAlive() {
+ return cnxn.getState().isAlive();
+ }
+
+ @Override
+ protected ClientCnxn createConnection(
+ String chrootPath,
+ HostProvider hostProvider,
+ int sessionTimeout,
+ ZooKeeper zooKeeper,
+ ClientWatchManager watcher,
+ ClientCnxnSocket clientCnxnSocket,
+ boolean canBeReadOnly) throws IOException {
+ Assert.assertTrue(clientCnxnSocket instanceof FragileClientCnxnSocketNIO);
+ socket = (FragileClientCnxnSocketNIO) clientCnxnSocket;
+ ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly);
+ return ClientCnxnSocketFragilityTest.this.cnxn;
+ }
+ }
+} \ No newline at end of file