diff options
author | Alexey Kozyatinskiy <kozyatinskiy@chromium.org> | 2018-08-17 18:47:05 -0700 |
---|---|---|
committer | Alexey Kozyatinskiy <kozyatinskiy@chromium.org> | 2018-08-20 18:10:00 -0700 |
commit | b1e26128f317a6f5a5808a0a727e98f80f088b84 (patch) | |
tree | b75efbdab9c37fd573e75d325b848ea84b1ee819 /src/node_platform.cc | |
parent | f1d3f97c3bc813528e85b9c3e6506fc75b931d92 (diff) | |
download | node-new-b1e26128f317a6f5a5808a0a727e98f80f088b84.tar.gz |
src: implement v8::Platform::CallDelayedOnWorkerThread
This method is crucial for Runtime.evaluate protocol command with
timeout flag. At least Chrome DevTools frontend uses this method for
every execution in console.
PR-URL: https://github.com/nodejs/node/pull/22383
Fixes: https://github.com/nodejs/node/issues/22157
Reviewed-By: Gus Caplan <me@gus.host>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'src/node_platform.cc')
-rw-r--r-- | src/node_platform.cc | 124 |
1 files changed, 123 insertions, 1 deletions
diff --git a/src/node_platform.cc b/src/node_platform.cc index 6a3ae2e5dc..92e9b371c5 100644 --- a/src/node_platform.cc +++ b/src/node_platform.cc @@ -2,6 +2,7 @@ #include "node_internals.h" #include "env-inl.h" +#include "debug_utils.h" #include "util.h" #include <algorithm> @@ -29,7 +30,127 @@ static void PlatformWorkerThread(void* data) { } // namespace +class WorkerThreadsTaskRunner::DelayedTaskScheduler { + public: + explicit DelayedTaskScheduler(TaskQueue<Task>* tasks) + : pending_worker_tasks_(tasks) {} + + std::unique_ptr<uv_thread_t> Start() { + auto start_thread = [](void* data) { + static_cast<DelayedTaskScheduler*>(data)->Run(); + }; + std::unique_ptr<uv_thread_t> t { new uv_thread_t() }; + uv_sem_init(&ready_, 0); + CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this)); + uv_sem_wait(&ready_); + uv_sem_destroy(&ready_); + return t; + } + + void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) { + tasks_.Push(std::unique_ptr<Task>(new ScheduleTask(this, std::move(task), + delay_in_seconds))); + uv_async_send(&flush_tasks_); + } + + void Stop() { + tasks_.Push(std::unique_ptr<Task>(new StopTask(this))); + uv_async_send(&flush_tasks_); + } + + private: + void Run() { + TRACE_EVENT_METADATA1("__metadata", "thread_name", "name", + "WorkerThreadsTaskRunner::DelayedTaskScheduler"); + loop_.data = this; + CHECK_EQ(0, uv_loop_init(&loop_)); + flush_tasks_.data = this; + CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks)); + uv_sem_post(&ready_); + + uv_run(&loop_, UV_RUN_DEFAULT); + CheckedUvLoopClose(&loop_); + } + + static void FlushTasks(uv_async_t* flush_tasks) { + DelayedTaskScheduler* scheduler = + ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop); + while (std::unique_ptr<Task> task = scheduler->tasks_.Pop()) + task->Run(); + } + + class StopTask : public Task { + public: + explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {} + + void Run() override { + std::vector<uv_timer_t*> timers; + for (uv_timer_t* timer : scheduler_->timers_) + timers.push_back(timer); + for (uv_timer_t* timer : timers) + scheduler_->TakeTimerTask(timer); + uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_), + [](uv_handle_t* handle) {}); + } + + private: + DelayedTaskScheduler* scheduler_; + }; + + class ScheduleTask : public Task { + public: + ScheduleTask(DelayedTaskScheduler* scheduler, + std::unique_ptr<Task> task, + double delay_in_seconds) + : scheduler_(scheduler), + task_(std::move(task)), + delay_in_seconds_(delay_in_seconds) {} + + void Run() override { + uint64_t delay_millis = + static_cast<uint64_t>(delay_in_seconds_ + 0.5) * 1000; + std::unique_ptr<uv_timer_t> timer(new uv_timer_t()); + CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get())); + timer->data = task_.release(); + CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0)); + scheduler_->timers_.insert(timer.release()); + } + + private: + DelayedTaskScheduler* scheduler_; + std::unique_ptr<Task> task_; + double delay_in_seconds_; + }; + + static void RunTask(uv_timer_t* timer) { + DelayedTaskScheduler* scheduler = + ContainerOf(&DelayedTaskScheduler::loop_, timer->loop); + scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer)); + } + + std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) { + std::unique_ptr<Task> task(static_cast<Task*>(timer->data)); + uv_timer_stop(timer); + uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) { + delete reinterpret_cast<uv_timer_t*>(handle); + }); + timers_.erase(timer); + return task; + } + + uv_sem_t ready_; + TaskQueue<v8::Task>* pending_worker_tasks_; + + TaskQueue<v8::Task> tasks_; + uv_loop_t loop_; + uv_async_t flush_tasks_; + std::unordered_set<uv_timer_t*> timers_; +}; + WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) { + delayed_task_scheduler_.reset( + new DelayedTaskScheduler(&pending_worker_tasks_)); + threads_.push_back(delayed_task_scheduler_->Start()); for (int i = 0; i < thread_pool_size; i++) { std::unique_ptr<uv_thread_t> t { new uv_thread_t() }; if (uv_thread_create(t.get(), PlatformWorkerThread, @@ -46,7 +167,7 @@ void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) { void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task, double delay_in_seconds) { - UNREACHABLE(); + delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds); } void WorkerThreadsTaskRunner::BlockingDrain() { @@ -55,6 +176,7 @@ void WorkerThreadsTaskRunner::BlockingDrain() { void WorkerThreadsTaskRunner::Shutdown() { pending_worker_tasks_.Stop(); + delayed_task_scheduler_->Stop(); for (size_t i = 0; i < threads_.size(); i++) { CHECK_EQ(0, uv_thread_join(threads_[i].get())); } |