From a859410aea35dfef5fa54c99fb8a5bfc81f1a46b Mon Sep 17 00:00:00 2001 From: Botond Hejj Date: Fri, 27 Jul 2018 19:37:08 -0700 Subject: ZOOKEEPER-3072: Throttle race condition fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Making the throttle check before passing over the request to the next thread will prevent the possibility of throttling code running after unthrottle Added an additional async hammer thread which is pretty reliably reproduces the race condition. The globalOutstandingLimit is decreased so throttling code is executed. Author: Botond Hejj Reviewers: Andor Molnár , Norbert Kalmar , Benjamin Reed Closes #563 from bothejjms/ZOOKEEPER-3072 (cherry picked from commit 2a372fcdce3c0142c0bb23f06098a2c1a49f807e) Signed-off-by: Benjamin Reed --- .../apache/zookeeper/server/ZooKeeperServer.java | 30 ++++++++++------------ 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index 3692dc4a8..a21140dcd 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1114,24 +1114,22 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { cnxn.disableRecv(); } return; + } else if (h.getType() == OpCode.sasl) { + Record rsp = processSasl(incomingBuffer,cnxn); + ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); + cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it? + return; } else { - if (h.getType() == OpCode.sasl) { - Record rsp = processSasl(incomingBuffer,cnxn); - ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); - cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it? - return; - } - else { - Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), - h.getType(), incomingBuffer, cnxn.getAuthInfo()); - si.setOwner(ServerCnxn.me); - // Always treat packet from the client as a possible - // local request. - setLocalSessionFlag(si); - submitRequest(si); - } + cnxn.incrOutstandingRequests(h); + Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), + h.getType(), incomingBuffer, cnxn.getAuthInfo()); + si.setOwner(ServerCnxn.me); + // Always treat packet from the client as a possible + // local request. + setLocalSessionFlag(si); + submitRequest(si); + return; } - cnxn.incrOutstandingRequests(h); } private Record processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn) throws IOException { -- cgit v1.2.1