diff options
author | yinfangxi <yinfangxi@kuaishou.com> | 2020-03-02 18:12:30 -0800 |
---|---|---|
committer | Mate Szalay-Beko <mszalay@cloudera.com> | 2022-05-17 10:59:38 +0200 |
commit | af44dabd425473ae57cba5e0deed4fc39de7e05c (patch) | |
tree | 76c8826909e27d6310a00089e8a5edce34539b9d | |
parent | aa20b0e80ab9afc9043f716a06b1c862a6125092 (diff) | |
download | zookeeper-af44dabd425473ae57cba5e0deed4fc39de7e05c.tar.gz |
ZOOKEEPER-3706: ZooKeeper.close() would leak SendThread when the netw…
…ork is broken
- add unit test to verify the bug
- bypass the SendThread.startConnect() by throw RuntimeExcepth if state.isAlive is false
Author: Fangxi Yin <yinfangxikuaishou.com>
Author: yinfangxi <yinfangxi@kuaishou.com>
Reviewers: Michael Han <hanm@apache.org>, Enrico Olivelli <eolivelli@gmail.com>, maoling
Closes #1235 from yfxhust/ZOOKEEPER-3706
-rw-r--r-- | zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java | 69 | ||||
-rw-r--r-- | zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java | 359 |
2 files changed, 403 insertions, 25 deletions
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index 5341f5761..3449a8921 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -178,7 +178,7 @@ public class ClientCnxn { * operation) */ private volatile boolean closing = false; - + /** * A set of ZooKeeper hosts this client could connect to. */ @@ -379,11 +379,11 @@ public class ClientCnxn { * @param canBeReadOnly * whether the connection is allowed to go to read-only * mode in case of partitioning - * @throws IOException + * @throws IOException in cases of broken network */ public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, - long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { + long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException { this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; @@ -649,7 +649,7 @@ public class ClientCnxn { .substring(chrootPath.length())), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); - } + } } else if (p.response instanceof MultiResponse) { MultiCallback cb = (MultiCallback) p.cb; MultiResponse rsp = (MultiResponse) p.response; @@ -735,6 +735,11 @@ public class ClientCnxn { eventThread.queueCallback(cb, rc, path, ctx); } + // for test only + protected void onConnecting(InetSocketAddress addr) { + + } + private void conLossPacket(Packet p) { if (p.replyHeader == null) { return; @@ -764,7 +769,7 @@ public class ClientCnxn { public EndOfStreamException(String msg) { super(msg); } - + @Override public String toString() { return "EndOfStreamException: " + getMessage(); @@ -778,7 +783,7 @@ public class ClientCnxn { super(msg); } } - + private static class SessionExpiredException extends IOException { private static final long serialVersionUID = -1388816932076193249L; @@ -826,10 +831,10 @@ public class ClientCnxn { return; } if (replyHdr.getXid() == -4) { - // -4 is the xid for AuthPacket + // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { - state = States.AUTH_FAILED; - eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, + changeZkState(States.AUTH_FAILED); + eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); eventThread.queueEventOfDeath(); } @@ -927,9 +932,9 @@ public class ClientCnxn { } } - SendThread(ClientCnxnSocket clientCnxnSocket) { + SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException { super(makeThreadName("-SendThread()")); - state = States.CONNECTING; + changeZkState(States.CONNECTING); this.clientCnxnSocket = clientCnxnSocket; setDaemon(true); } @@ -940,13 +945,22 @@ public class ClientCnxn { // Runnable /** * Used by ClientCnxnSocket - * + * * @return */ - ZooKeeper.States getZkState() { + synchronized ZooKeeper.States getZkState() { return state; } + synchronized void changeZkState(ZooKeeper.States newState) throws IOException { + if (!state.isAlive() && newState == States.CONNECTING) { + throw new IOException( + "Connection has already been closed and reconnection is not allowed"); + } + // It's safer to place state modification at the end. + state = newState; + } + ClientCnxnSocket getClientCnxnSocket() { return clientCnxnSocket; } @@ -1073,7 +1087,7 @@ public class ClientCnxn { LOG.warn("Unexpected exception", e); } } - state = States.CONNECTING; + changeZkState(States.CONNECTING); String hostPort = addr.getHostString() + ":" + addr.getPort(); MDC.put("myid", hostPort); @@ -1136,6 +1150,7 @@ public class ClientCnxn { } else { serverAddress = hostProvider.next(1000); } + onConnecting(serverAddress); startConnect(serverAddress); // Update now to start the connection timer right after we make a connection attempt clientCnxnSocket.updateNow(); @@ -1150,8 +1165,8 @@ public class ClientCnxn { try { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { - LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e); - state = States.AUTH_FAILED; + LOG.error("SASL authentication with Zookeeper Quorum member failed.", e); + changeZkState(States.AUTH_FAILED); sendAuthEvent = true; } } @@ -1159,7 +1174,7 @@ public class ClientCnxn { if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. - state = States.AUTH_FAILED; + changeZkState(States.AUTH_FAILED); sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { @@ -1181,7 +1196,7 @@ public class ClientCnxn { } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } - + if (to <= 0) { String warnInfo; warnInfo = "Client session timed out, have not heard from server in " @@ -1194,8 +1209,8 @@ public class ClientCnxn { } if (state.isConnected()) { //1000(1 second) is to prevent race condition missing to send the second ping - //also make sure not to send too many pings when readTimeout is small - int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - + //also make sure not to send too many pings when readTimeout is small + int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { @@ -1363,7 +1378,7 @@ public class ClientCnxn { /** * Callback invoked by the ClientCnxnSocket once a connection has been * established. - * + * * @param _negotiatedSessionTimeout * @param _sessionId * @param _sessionPasswd @@ -1374,7 +1389,7 @@ public class ClientCnxn { byte[] _sessionPasswd, boolean isRO) throws IOException { negotiatedSessionTimeout = _negotiatedSessionTimeout; if (negotiatedSessionTimeout <= 0) { - state = States.CLOSED; + changeZkState(States.CLOSED); eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, @@ -1395,8 +1410,7 @@ public class ClientCnxn { hostProvider.onConnected(); sessionId = _sessionId; sessionPasswd = _sessionPasswd; - state = (isRO) ? - States.CONNECTEDREADONLY : States.CONNECTED; + changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED); seenRwServerBefore |= !isRO; LOG.info("Session establishment complete on server " + clientCnxnSocket.getRemoteSocketAddress() @@ -1411,7 +1425,12 @@ public class ClientCnxn { } void close() { - state = States.CLOSED; + try { + changeZkState(States.CLOSED); + } catch (IOException e) { + LOG.warn("Connection close fails when migrates state from {} to CLOSED", + getZkState()); + } clientCnxnSocket.onClosing(); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java new file mode 100644 index 000000000..50c93361e --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.zookeeper.ClientCnxn.Packet; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.client.HostProvider; +import org.apache.zookeeper.client.ZKClientConfig; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.quorum.QuorumPeerTestBase; +import org.apache.zookeeper.test.ClientBase; +import org.junit.Assert; +import org.junit.Test; + +public class ClientCnxnSocketFragilityTest extends QuorumPeerTestBase { + + private static final int SERVER_COUNT = 3; + + private static final int SESSION_TIMEOUT = 40000; + + public static final int CONNECTION_TIMEOUT = 30000; + + private final UnsafeCoordinator unsafeCoordinator = new UnsafeCoordinator(); + + private volatile CustomZooKeeper zk = null; + + private volatile FragileClientCnxnSocketNIO socket = null; + + private volatile CustomClientCnxn cnxn = null; + + private String getCxnString(int[] clientPorts) { + StringBuffer hostPortBuffer = new StringBuffer(); + for (int i = 0; i < clientPorts.length; i++) { + hostPortBuffer.append("127.0.0.1:"); + hostPortBuffer.append(clientPorts[i]); + if (i != (clientPorts.length - 1)) { + hostPortBuffer.append(','); + } + } + return hostPortBuffer.toString(); + } + + private void closeZookeeper(ZooKeeper zk) { + Executors.newSingleThreadExecutor().submit(() -> { + try { + LOG.info("closeZookeeper is fired"); + zk.close(); + } catch (InterruptedException e) { + } + }); + } + + @Test + public void testClientCnxnSocketFragility() throws Exception { + System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, + FragileClientCnxnSocketNIO.class.getName()); + System.setProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, "1000"); + final int[] clientPorts = new int[SERVER_COUNT]; + StringBuilder sb = new StringBuilder(); + String server; + + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + + PortAssignment.unique() + ":participant;127.0.0.1:" + clientPorts[i]; + sb.append(server + "\n"); + } + String currentQuorumCfgSection = sb.toString(); + MainThread[] mt = new MainThread[SERVER_COUNT]; + + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false); + mt[i].start(); + } + + // Ensure server started + for (int i = 0; i < SERVER_COUNT; i++) { + Assert.assertTrue("waiting for server " + i + " being up", + ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT)); + } + String path = "/testClientCnxnSocketFragility"; + String data = "balabala"; + ClientWatcher watcher = new ClientWatcher(); + zk = new CustomZooKeeper(getCxnString(clientPorts), SESSION_TIMEOUT, watcher); + watcher.watchFor(zk); + + // Let's see some successful operations + zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Assert.assertEquals(new String(zk.getData(path, false, new Stat())), data); + Assert.assertTrue(!watcher.isSessionExpired()); + + // Let's make a broken operation + socket.mute(); + boolean catchKeeperException = false; + try { + zk.getData(path, false, new Stat()); + } catch (KeeperException e) { + catchKeeperException = true; + Assert.assertFalse(e instanceof KeeperException.SessionExpiredException); + } + socket.unmute(); + Assert.assertTrue(catchKeeperException); + Assert.assertTrue(!watcher.isSessionExpired()); + + GetDataRetryForeverBackgroundTask retryForeverGetData = + new GetDataRetryForeverBackgroundTask(zk, path); + retryForeverGetData.startTask(); + // Let's make a broken network + socket.mute(); + + // Let's attempt to close ZooKeeper + cnxn.attemptClose(); + + // Wait some time to expect continuous reconnecting. + // We try to make reconnecting hit the unsafe region. + cnxn.waitUntilHitUnsafeRegion(); + + // close zk with timeout 1000 milli seconds + closeZookeeper(zk); + TimeUnit.MILLISECONDS.sleep(3000); + + // Since we already close zookeeper, we expect that the zk should not be alive. + Assert.assertTrue(!zk.isAlive()); + Assert.assertTrue(!watcher.isSessionExpired()); + + retryForeverGetData.syncCloseTask(); + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i].shutdown(); + } + } + + class GetDataRetryForeverBackgroundTask extends Thread { + private volatile boolean alive; + private final CustomZooKeeper zk; + private final String path; + + GetDataRetryForeverBackgroundTask(CustomZooKeeper zk, String path) { + this.alive = false; + this.zk = zk; + this.path = path; + // marked as daemon to avoid exhausting CPU + setDaemon(true); + } + + void startTask() { + alive = true; + start(); + } + + void syncCloseTask() throws InterruptedException { + alive = false; + join(); + } + + @Override + public void run() { + while (alive) { + try { + zk.getData(path, false, new Stat()); + // sleep for a while to avoid exhausting CPU + TimeUnit.MILLISECONDS.sleep(500); + } catch (Exception e) { + LOG.info("zookeeper getData failed on path {}", path); + } + } + } + } + + public static class FragileClientCnxnSocketNIO extends ClientCnxnSocketNIO { + + private volatile boolean mute; + + public FragileClientCnxnSocketNIO(ZKClientConfig clientConfig) throws IOException { + super(clientConfig); + mute = false; + } + + synchronized void mute() { + if (!mute) { + LOG.info("Fire socket mute"); + mute = true; + } + } + + synchronized void unmute() { + if (mute) { + LOG.info("Fire socket unmute"); + mute = false; + } + } + + @Override + void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) + throws IOException, InterruptedException { + if (mute) { + throw new IOException("Socket is mute"); + } + super.doTransport(waitTimeOut, pendingQueue, cnxn); + } + + @Override + void connect(InetSocketAddress addr) throws IOException { + if (mute) { + throw new IOException("Socket is mute"); + } + super.connect(addr); + } + } + + class ClientWatcher implements Watcher { + + private ZooKeeper zk; + + private boolean sessionExpired = false; + + void watchFor(ZooKeeper zk) { + this.zk = zk; + } + + @Override + public void process(WatchedEvent event) { + LOG.info("Watcher got {}", event); + if (event.getState() == KeeperState.Expired) { + sessionExpired = true; + } + } + + boolean isSessionExpired() { + return sessionExpired; + } + } + + // Coordinate to construct the risky scenario. + class UnsafeCoordinator { + + private CountDownLatch syncLatch = new CountDownLatch(2); + + void sync(boolean closing) { + LOG.info("Attempt to sync with {}", closing); + if (closing) { + syncLatch.countDown(); + try { + syncLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + class CustomClientCnxn extends ClientCnxn { + + private volatile boolean closing = false; + + private volatile boolean hitUnsafeRegion = false; + + public CustomClientCnxn( + String chrootPath, + HostProvider hostProvider, + int sessionTimeout, + ZooKeeper zooKeeper, + ClientWatchManager watcher, + ClientCnxnSocket clientCnxnSocket, + boolean canBeReadOnly) throws IOException { + super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly); + } + + void attemptClose() { + closing = true; + } + + void waitUntilHitUnsafeRegion() { + while (!hitUnsafeRegion) { + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + } + } + } + + @Override + protected void onConnecting(InetSocketAddress addr) { + if (closing) { + LOG.info("Attempt to connnecting {} {} {}", addr, closing, state); + ///////// Unsafe Region //////// + // Slow down and zoom out the unsafe point to make risk + // The unsafe point is that startConnect happens after sendThread.close + hitUnsafeRegion = true; + unsafeCoordinator.sync(closing); + //////////////////////////////// + } + } + + @Override + public void disconnect() { + Assert.assertTrue(closing); + LOG.info("Attempt to disconnecting client for session: 0x{} {} {}", Long.toHexString(getSessionId()), closing, state); + sendThread.close(); + ///////// Unsafe Region //////// + unsafeCoordinator.sync(closing); + //////////////////////////////// + try { + sendThread.join(); + } catch (InterruptedException ex) { + LOG.warn("Got interrupted while waiting for the sender thread to close", ex); + } + eventThread.queueEventOfDeath(); + } + } + + class CustomZooKeeper extends ZooKeeper { + + public CustomZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException { + super(connectString, sessionTimeout, watcher); + } + + public boolean isAlive() { + return cnxn.getState().isAlive(); + } + + @Override + protected ClientCnxn createConnection( + String chrootPath, + HostProvider hostProvider, + int sessionTimeout, + ZooKeeper zooKeeper, + ClientWatchManager watcher, + ClientCnxnSocket clientCnxnSocket, + boolean canBeReadOnly) throws IOException { + Assert.assertTrue(clientCnxnSocket instanceof FragileClientCnxnSocketNIO); + socket = (FragileClientCnxnSocketNIO) clientCnxnSocket; + ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly); + return ClientCnxnSocketFragilityTest.this.cnxn; + } + } +}
\ No newline at end of file |