// Copyright 2018 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/fuchsia/async_dispatcher.h" #include #include #include #include #include #include "base/callback.h" #include "base/process/launch.h" #include "base/test/multiprocess_test.h" #include "base/test/test_timeouts.h" #include "base/time/time.h" #include "testing/gtest/include/gtest/gtest.h" #include "testing/multiprocess_func_list.h" namespace base { namespace { struct TestTask : public async_task_t { explicit TestTask() { state = ASYNC_STATE_INIT; handler = &TaskProc; deadline = 0; } static void TaskProc(async_dispatcher_t* async, async_task_t* task, zx_status_t status); int num_calls = 0; int repeats = 1; OnceClosure on_call; zx_status_t last_status = ZX_OK; }; // static void TestTask::TaskProc(async_dispatcher_t* async, async_task_t* task, zx_status_t status) { EXPECT_EQ(async, async_get_default_dispatcher()); EXPECT_TRUE(status == ZX_OK || status == ZX_ERR_CANCELED) << "status: " << status; auto* test_task = static_cast(task); test_task->num_calls++; test_task->last_status = status; if (test_task->on_call) std::move(test_task->on_call).Run(); if (test_task->num_calls < test_task->repeats) async_post_task(async, task); }; struct TestWait : public async_wait_t { TestWait(zx_handle_t handle, zx_signals_t signals) { state = ASYNC_STATE_INIT; handler = &HandleProc; object = handle; trigger = signals; } static void HandleProc(async_dispatcher_t* async, async_wait_t* wait, zx_status_t status, const zx_packet_signal_t* signal); int num_calls = 0; OnceClosure on_call; zx_status_t last_status = ZX_OK; }; // static void TestWait::HandleProc(async_dispatcher_t* async, async_wait_t* wait, zx_status_t status, const zx_packet_signal_t* signal) { EXPECT_EQ(async, async_get_default_dispatcher()); EXPECT_TRUE(status == ZX_OK || status == ZX_ERR_CANCELED) << "status: " << status; auto* test_wait = static_cast(wait); test_wait->num_calls++; test_wait->last_status = status; if (test_wait->on_call) std::move(test_wait->on_call).Run(); } struct TestException : public async_exception_t { TestException(zx_handle_t handle) { state = ASYNC_STATE_INIT; handler = &HandleProc; task = handle; options = 0; } static void HandleProc(async_dispatcher_t* async, async_exception_t* wait, zx_status_t status, const zx_port_packet_t* packet); int num_calls = 0; OnceClosure on_call; zx_status_t last_status = ZX_OK; }; // static void TestException::HandleProc(async_dispatcher_t* async, async_exception_t* wait, zx_status_t status, const zx_port_packet_t* packet) { EXPECT_EQ(async, async_get_default_dispatcher()); auto* test_wait = static_cast(wait); test_wait->num_calls++; test_wait->last_status = status; if (test_wait->on_call) std::move(test_wait->on_call).Run(); } } // namespace class AsyncDispatcherTest : public MultiProcessTest { public: AsyncDispatcherTest() { dispatcher_ = std::make_unique(); async_ = async_get_default_dispatcher(); EXPECT_TRUE(async_); EXPECT_EQ(zx::socket::create(ZX_SOCKET_DATAGRAM, &socket1_, &socket2_), ZX_OK); } ~AsyncDispatcherTest() override = default; void RunUntilIdle() { while (true) { zx_status_t status = dispatcher_->DispatchOrWaitUntil(0); if (status != ZX_OK) { EXPECT_EQ(status, ZX_ERR_TIMED_OUT); break; } } } protected: std::unique_ptr dispatcher_; async_dispatcher_t* async_ = nullptr; zx::socket socket1_; zx::socket socket2_; }; TEST_F(AsyncDispatcherTest, PostTask) { TestTask task; ASSERT_EQ(async_post_task(async_, &task), ZX_OK); dispatcher_->DispatchOrWaitUntil(0); EXPECT_EQ(task.num_calls, 1); EXPECT_EQ(task.last_status, ZX_OK); } TEST_F(AsyncDispatcherTest, TaskRepeat) { TestTask task; task.repeats = 2; ASSERT_EQ(async_post_task(async_, &task), ZX_OK); RunUntilIdle(); EXPECT_EQ(task.num_calls, 2); EXPECT_EQ(task.last_status, ZX_OK); } TEST_F(AsyncDispatcherTest, DelayedTask) { TestTask task; constexpr auto kDelay = TimeDelta::FromMilliseconds(5); TimeTicks started = TimeTicks::Now(); task.deadline = zx_deadline_after(kDelay.InNanoseconds()); ASSERT_EQ(async_post_task(async_, &task), ZX_OK); zx_status_t status = dispatcher_->DispatchOrWaitUntil(zx_deadline_after( (kDelay + TestTimeouts::tiny_timeout()).InNanoseconds())); EXPECT_EQ(status, ZX_OK); EXPECT_GE(TimeTicks::Now() - started, kDelay); } TEST_F(AsyncDispatcherTest, CancelTask) { TestTask task; ASSERT_EQ(async_post_task(async_, &task), ZX_OK); ASSERT_EQ(async_cancel_task(async_, &task), ZX_OK); RunUntilIdle(); EXPECT_EQ(task.num_calls, 0); } TEST_F(AsyncDispatcherTest, TaskObserveShutdown) { TestTask task; ASSERT_EQ(async_post_task(async_, &task), ZX_OK); dispatcher_.reset(); EXPECT_EQ(task.num_calls, 1); EXPECT_EQ(task.last_status, ZX_ERR_CANCELED); } TEST_F(AsyncDispatcherTest, Wait) { TestWait wait(socket1_.get(), ZX_SOCKET_READABLE); EXPECT_EQ(async_begin_wait(async_, &wait), ZX_OK); // Handler shouldn't be called because the event wasn't signaled. RunUntilIdle(); EXPECT_EQ(wait.num_calls, 0); char byte = 0; EXPECT_EQ(socket2_.write(/*options=*/0, &byte, sizeof(byte), /*actual=*/nullptr), ZX_OK); zx_status_t status = dispatcher_->DispatchOrWaitUntil( zx_deadline_after(TestTimeouts::tiny_timeout().InNanoseconds())); EXPECT_EQ(status, ZX_OK); EXPECT_EQ(wait.num_calls, 1); EXPECT_EQ(wait.last_status, ZX_OK); } TEST_F(AsyncDispatcherTest, CancelWait) { TestWait wait(socket1_.get(), ZX_SOCKET_READABLE); EXPECT_EQ(async_begin_wait(async_, &wait), ZX_OK); char byte = 0; EXPECT_EQ(socket2_.write(/*options=*/0, &byte, sizeof(byte), /*actual=*/nullptr), ZX_OK); EXPECT_EQ(async_cancel_wait(async_, &wait), ZX_OK); RunUntilIdle(); EXPECT_EQ(wait.num_calls, 0); } TEST_F(AsyncDispatcherTest, WaitShutdown) { TestWait wait(socket1_.get(), ZX_SOCKET_READABLE); EXPECT_EQ(async_begin_wait(async_, &wait), ZX_OK); RunUntilIdle(); dispatcher_.reset(); EXPECT_EQ(wait.num_calls, 1); EXPECT_EQ(wait.last_status, ZX_ERR_CANCELED); } // Sub-process which crashes itself, to generate an exception-port event. MULTIPROCESS_TEST_MAIN(AsyncDispatcherCrashingChild) { IMMEDIATE_CRASH(); return 0; } TEST_F(AsyncDispatcherTest, BindExceptionPort) { zx::job child_job; ASSERT_EQ(zx::job::create(*zx::job::default_job(), 0, &child_job), ZX_OK); // Bind |child_job|'s exception port to the dispatcher. TestException exception(child_job.get()); EXPECT_EQ(async_bind_exception_port(async_, &exception), ZX_OK); // Launch a child process in the job, that will immediately crash. LaunchOptions options; options.job_handle = child_job.get(); Process child = SpawnChildWithOptions("AsyncDispatcherCrashingChild", options); ASSERT_TRUE(child.IsValid()); // Wait for the exception event to be handled. EXPECT_EQ( dispatcher_->DispatchOrWaitUntil( (TimeTicks::Now() + TestTimeouts::action_max_timeout()).ToZxTime()), ZX_OK); EXPECT_EQ(exception.num_calls, 1); EXPECT_EQ(exception.last_status, ZX_OK); EXPECT_EQ(async_unbind_exception_port(async_, &exception), ZX_OK); ASSERT_EQ(child_job.kill(), ZX_OK); } TEST_F(AsyncDispatcherTest, CancelExceptionPort) { zx::job child_job; ASSERT_EQ(zx::job::create(*zx::job::default_job(), 0, &child_job), ZX_OK); // Bind |child_job|'s exception port to the dispatcher. TestException exception(child_job.get()); EXPECT_EQ(async_bind_exception_port(async_, &exception), ZX_OK); // Tear-down the dispatcher, and verify that the |exception| is cancelled. dispatcher_ = nullptr; EXPECT_EQ(exception.num_calls, 1); EXPECT_EQ(exception.last_status, ZX_ERR_CANCELED); ASSERT_EQ(child_job.kill(), ZX_OK); } } // namespace base