From bc1b231c9e32667b2978c86a6a64833470973dbd Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Sun, 25 Sep 2022 16:21:45 +0800 Subject: ZOOKEEPER-4327: Fix flaky RequestThrottlerTest This PR tries to fix several test failures in `RequestThrottlerTest`. First, `RequestThrottlerTest#testDropStaleRequests`. Place `Thread.sleep(200)` after `submittedRequests.take()` in `RequestThrottler#run` will fail two assertions: 1. `assertEquals(2L, (long) metrics.get("prep_processor_request_queued"))` 2. `assertEquals(1L, (long) metrics.get("request_throttle_wait_count"))` This happens due to `setStale` chould happen before throttle handling. This commit solves this by introducing an interception point `RequestThrottler.throttleSleep` to build happen-before relations: 1. `throttling.countDown` happens before `setStale`, this ensures that unthrottled request are processed as usual. 2. `setStale` happens before `throttled.await`, this defends `RequestThrottler.throttleSleep` against spurious wakeup. Second, `RequestThrottlerTest#testRequestThrottler`. * `RequestThrottlerTest.testRequestThrottler:197 expected: <2> but was: <1>` `ZooKeeperServer#submitRequest` and `PrepRequestProcessor#processRequest` run in different threads, thus there is no guarantee on metric `prep_processor_request_queued` after `submitted.await(5, TimeUnit.SECONDS)`. Place `Thread.sleep(200)` before `zks.submitRequestNow(request)` in `RequestThrottler#run` will incur this failure. * `RequestThrottlerTest.testRequestThrottler:206 expected: <5> but was: <4>` `entered.await(STALL_TIME, TimeUnit.MILLISECONDS)` could return `false` due to almost same timeout as `RequestThrottler#throttleSleep`. Place `Thread.sleep(500)` around `throttleSleep` will increase failure possibility. Third, `RequestThrottlerTest#testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled`. * `RequestThrottlerTest.testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled:340 expected: <3> but was: <4>` `ZooKeeperServer#shouldThrottle` depends on consistent sum of `getInflight` and `getInProcess`. But it is no true. Place `Thread.sleep(200)` before `zks.submitRequestNow(request)` in `RequestThrottler#run` could reproduce this. Sees also https://github.com/apache/zookeeper/pull/1739, https://github.com/apache/zookeeper/pull/1821. Author: Kezhu Wang Reviewers: Mate Szalay-Beko , maoling Closes #1887 from kezhuw/ZOOKEEPER-4327-flaky-RequestThrottlerTest.testDropStaleRequests --- .../apache/zookeeper/server/RequestThrottler.java | 12 ++-- .../apache/zookeeper/server/ZooKeeperServer.java | 5 +- .../zookeeper/server/RequestThrottlerTest.java | 66 ++++++++++++++++++---- 3 files changed, 64 insertions(+), 19 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java index d60efa087..4a401e5b9 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java @@ -195,13 +195,11 @@ public class RequestThrottler extends ZooKeeperCriticalThread { LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped); } - private synchronized void throttleSleep(int stallTime) { - try { - ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1); - this.wait(stallTime); - } catch (InterruptedException ie) { - return; - } + + // @VisibleForTesting + synchronized void throttleSleep(int stallTime) throws InterruptedException { + ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1); + this.wait(stallTime); } @SuppressFBWarnings(value = "NN_NAKED_NOTIFY", justification = "state change is in ZooKeeperServer.decInProgress() ") diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 0303ca645..817e84b3e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -749,9 +749,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } protected void startRequestThrottler() { - requestThrottler = new RequestThrottler(this); + requestThrottler = createRequestThrottler(); requestThrottler.start(); + } + protected RequestThrottler createRequestThrottler() { + return new RequestThrottler(this); } protected void setupRequestProcessors() { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java index ed2239990..152592075 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java @@ -67,11 +67,17 @@ public class RequestThrottlerTest extends ZKTestCase { CountDownLatch disconnected = null; + CountDownLatch throttled = null; + CountDownLatch throttling = null; + ZooKeeperServer zks = null; ServerCnxnFactory f = null; ZooKeeper zk = null; int connectionLossCount = 0; + private long getCounterMetric(String name) { + return (long) MetricsUtils.currentServerMetrics().get(name); + } @BeforeEach public void setup() throws Exception { @@ -115,6 +121,11 @@ public class RequestThrottlerTest extends ZKTestCase { super(snapDir, logDir, tickTime); } + @Override + protected RequestThrottler createRequestThrottler() { + return new TestRequestThrottler(this); + } + @Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); @@ -141,6 +152,24 @@ public class RequestThrottlerTest extends ZKTestCase { } } + class TestRequestThrottler extends RequestThrottler { + public TestRequestThrottler(ZooKeeperServer zks) { + super(zks); + } + + @Override + synchronized void throttleSleep(int stallTime) throws InterruptedException { + if (throttling != null) { + throttling.countDown(); + } + super.throttleSleep(stallTime); + // Defend against unstable timing and potential spurious wakeup. + if (throttled != null) { + assertTrue(throttled.await(20, TimeUnit.SECONDS)); + } + } + } + class TestPrepRequestProcessor extends PrepRequestProcessor { public TestPrepRequestProcessor(ZooKeeperServer zks, RequestProcessor syncProcessor) { @@ -191,18 +220,22 @@ public class RequestThrottlerTest extends ZKTestCase { // make sure the server received all 5 requests submitted.await(5, TimeUnit.SECONDS); - Map metrics = MetricsUtils.currentServerMetrics(); // but only two requests can get into the pipeline because of the throttler - assertEquals(2L, (long) metrics.get("prep_processor_request_queued")); - assertEquals(1L, (long) metrics.get("request_throttle_wait_count")); + WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") == 2; + waitFor("request not queued", requestQueued, 5); + + WaitForCondition throttleWait = () -> getCounterMetric("request_throttle_wait_count") >= 1; + waitFor("no throttle wait", throttleWait, 5); // let the requests go through the pipeline and the throttler will be waken up to allow more requests // to enter the pipeline resumeProcess.countDown(); - entered.await(STALL_TIME, TimeUnit.MILLISECONDS); - metrics = MetricsUtils.currentServerMetrics(); + // wait for more than one STALL_TIME to reduce timeout before wakeup + assertTrue(entered.await(STALL_TIME + 5000, TimeUnit.MILLISECONDS)); + + Map metrics = MetricsUtils.currentServerMetrics(); assertEquals(TOTAL_REQUESTS, (long) metrics.get("prep_processor_request_queued")); } @@ -221,6 +254,9 @@ public class RequestThrottlerTest extends ZKTestCase { resumeProcess = new CountDownLatch(1); submitted = new CountDownLatch(TOTAL_REQUESTS); + throttled = new CountDownLatch(1); + throttling = new CountDownLatch(1); + // send 5 requests asynchronously for (int i = 0; i < TOTAL_REQUESTS; i++) { zk.create("/request_throttle_test- " + i, ("/request_throttle_test- " @@ -231,11 +267,18 @@ public class RequestThrottlerTest extends ZKTestCase { // make sure the server received all 5 requests assertTrue(submitted.await(5, TimeUnit.SECONDS)); + // stale throttled requests + assertTrue(throttling.await(5, TimeUnit.SECONDS)); for (ServerCnxn cnxn : f.cnxns) { cnxn.setStale(); } + throttled.countDown(); zk = null; + // only first three requests are counted as finished + finished = new CountDownLatch(3); + + // let the requests go through the pipeline resumeProcess.countDown(); LOG.info("raise the latch"); @@ -243,6 +286,8 @@ public class RequestThrottlerTest extends ZKTestCase { Thread.sleep(50); } + assertTrue(finished.await(5, TimeUnit.SECONDS)); + // assert after all requests processed to avoid concurrent issues as metrics are // counted in different threads. Map metrics = MetricsUtils.currentServerMetrics(); @@ -327,7 +372,6 @@ public class RequestThrottlerTest extends ZKTestCase { RequestThrottler.setMaxRequests(0); resumeProcess = new CountDownLatch(1); int totalRequests = 10; - submitted = new CountDownLatch(totalRequests); for (int i = 0; i < totalRequests; i++) { zk.create("/request_throttle_test- " + i, ("/request_throttle_test- " @@ -335,16 +379,16 @@ public class RequestThrottlerTest extends ZKTestCase { }, null); } - submitted.await(5, TimeUnit.SECONDS); - // We should start throttling instead of queuing more requests. // // We always allow up to GLOBAL_OUTSTANDING_LIMIT + 1 number of requests coming in request processing pipeline // before throttling. For the next request, we will throttle by disabling receiving future requests but we still - // allow this single request coming in. So the total number of queued requests in processing pipeline would + // allow this single request coming in. Ideally, the total number of queued requests in processing pipeline would // be GLOBAL_OUTSTANDING_LIMIT + 2. - assertEquals(Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2, - (long) MetricsUtils.currentServerMetrics().get("prep_processor_request_queued")); + // + // But due to leak of consistent view of number of outstanding requests, the number could be larger. + WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") >= Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2; + waitFor("no enough requests queued", requestQueued, 5); resumeProcess.countDown(); } catch (Exception e) { -- cgit v1.2.1