diff options
Diffstat (limited to 'deps/v8/src/wasm/module-compiler.cc')
-rw-r--r-- | deps/v8/src/wasm/module-compiler.cc | 901 |
1 files changed, 434 insertions, 467 deletions
diff --git a/deps/v8/src/wasm/module-compiler.cc b/deps/v8/src/wasm/module-compiler.cc index 967e092b5b..82f86786a7 100644 --- a/deps/v8/src/wasm/module-compiler.cc +++ b/deps/v8/src/wasm/module-compiler.cc @@ -79,105 +79,24 @@ enum class CompileStrategy : uint8_t { kDefault = kEager, }; -// Background compile jobs hold a shared pointer to this token. The token is -// used to notify them that they should stop. As soon as they see this (after -// finishing their current compilation unit), they will stop. -// This allows to already remove the NativeModule without having to synchronize -// on background compile jobs. -class BackgroundCompileToken { - public: - explicit BackgroundCompileToken( - const std::shared_ptr<NativeModule>& native_module) - : native_module_(native_module) {} - - void Cancel() { - base::SharedMutexGuard<base::kExclusive> mutex_guard( - &compilation_scope_mutex_); - native_module_.reset(); - } - - private: - friend class BackgroundCompileScope; - - std::shared_ptr<NativeModule> StartScope() { - compilation_scope_mutex_.LockShared(); - return native_module_.lock(); - } - - // This private method can only be called via {BackgroundCompileScope}. - void SchedulePublishCode(NativeModule* native_module, - std::vector<std::unique_ptr<WasmCode>> codes) { - { - base::MutexGuard guard(&publish_mutex_); - if (publisher_running_) { - // Add new code to the queue and return. - publish_queue_.reserve(publish_queue_.size() + codes.size()); - for (auto& c : codes) publish_queue_.emplace_back(std::move(c)); - return; - } - publisher_running_ = true; - } - while (true) { - PublishCode(native_module, VectorOf(codes)); - codes.clear(); - - // Keep publishing new code that came in. - base::MutexGuard guard(&publish_mutex_); - DCHECK(publisher_running_); - if (publish_queue_.empty()) { - publisher_running_ = false; - return; - } - codes.swap(publish_queue_); - } - } - - void PublishCode(NativeModule*, Vector<std::unique_ptr<WasmCode>>); - - void ExitScope() { compilation_scope_mutex_.UnlockShared(); } - - // {compilation_scope_mutex_} protects {native_module_}. - base::SharedMutex compilation_scope_mutex_; - std::weak_ptr<NativeModule> native_module_; - - // {publish_mutex_} protects {publish_queue_} and {publisher_running_}. - base::Mutex publish_mutex_; - std::vector<std::unique_ptr<WasmCode>> publish_queue_; - bool publisher_running_ = false; -}; - class CompilationStateImpl; -// Keep these scopes short, as they hold the mutex of the token, which -// sequentializes all these scopes. The mutex is also acquired from foreground -// tasks, which should not be blocked for a long time. class BackgroundCompileScope { public: - explicit BackgroundCompileScope( - const std::shared_ptr<BackgroundCompileToken>& token) - : token_(token.get()), native_module_(token->StartScope()) {} - - ~BackgroundCompileScope() { token_->ExitScope(); } - - bool cancelled() const { return native_module_ == nullptr; } + explicit BackgroundCompileScope(std::weak_ptr<NativeModule> native_module) + : native_module_(native_module.lock()) {} - NativeModule* native_module() { - DCHECK(!cancelled()); + NativeModule* native_module() const { + DCHECK(native_module_); return native_module_.get(); } + inline CompilationStateImpl* compilation_state() const; - inline CompilationStateImpl* compilation_state(); - - // Call {SchedulePublishCode} via the {BackgroundCompileScope} to guarantee - // that the {NativeModule} stays alive. - void SchedulePublishCode(std::vector<std::unique_ptr<WasmCode>> codes) { - token_->SchedulePublishCode(native_module_.get(), std::move(codes)); - } + bool cancelled() const; private: - BackgroundCompileToken* const token_; // Keep the native module alive while in this scope. - std::shared_ptr<NativeModule> const native_module_; + std::shared_ptr<NativeModule> native_module_; }; enum CompileBaselineOnly : bool { @@ -190,33 +109,74 @@ enum CompileBaselineOnly : bool { // runs empty. class CompilationUnitQueues { public: - explicit CompilationUnitQueues(int max_tasks, int num_declared_functions) - : queues_(max_tasks), top_tier_priority_units_queues_(max_tasks) { - DCHECK_LT(0, max_tasks); - for (int task_id = 0; task_id < max_tasks; ++task_id) { - queues_[task_id].next_steal_task_id = next_task_id(task_id); - } + // Public API for QueueImpl. + struct Queue { + bool ShouldPublish(int num_processed_units) const; + }; + + explicit CompilationUnitQueues(int num_declared_functions) + : num_declared_functions_(num_declared_functions) { + // Add one first queue, to add units to. + queues_.emplace_back(std::make_unique<QueueImpl>(0)); + for (auto& atomic_counter : num_units_) { std::atomic_init(&atomic_counter, size_t{0}); } - treated_ = std::make_unique<std::atomic<bool>[]>(num_declared_functions); + top_tier_compiled_ = + std::make_unique<std::atomic<bool>[]>(num_declared_functions); for (int i = 0; i < num_declared_functions; i++) { - std::atomic_init(&treated_.get()[i], false); + std::atomic_init(&top_tier_compiled_.get()[i], false); } } - base::Optional<WasmCompilationUnit> GetNextUnit( - int task_id, CompileBaselineOnly baseline_only) { - DCHECK_LE(0, task_id); - DCHECK_GT(queues_.size(), task_id); + Queue* GetQueueForTask(int task_id) { + int required_queues = task_id + 1; + { + base::SharedMutexGuard<base::kShared> queues_guard(&queues_mutex_); + if (V8_LIKELY(static_cast<int>(queues_.size()) >= required_queues)) { + return queues_[task_id].get(); + } + } + + // Otherwise increase the number of queues. + base::SharedMutexGuard<base::kExclusive> queues_guard(&queues_mutex_); + int num_queues = static_cast<int>(queues_.size()); + while (num_queues < required_queues) { + int steal_from = num_queues + 1; + queues_.emplace_back(std::make_unique<QueueImpl>(steal_from)); + ++num_queues; + } + + // Update the {publish_limit}s of all queues. + + // We want background threads to publish regularly (to avoid contention when + // they are all publishing at the end). On the other side, each publishing + // has some overhead (part of it for synchronizing between threads), so it + // should not happen *too* often. Thus aim for 4-8 publishes per thread, but + // distribute it such that publishing is likely to happen at different + // times. + int units_per_thread = num_declared_functions_ / num_queues; + int min = std::max(10, units_per_thread / 8); + int queue_id = 0; + for (auto& queue : queues_) { + // Set a limit between {min} and {2*min}, but not smaller than {10}. + int limit = min + (min * queue_id / num_queues); + queue->publish_limit.store(limit, std::memory_order_relaxed); + ++queue_id; + } + + return queues_[task_id].get(); + } + base::Optional<WasmCompilationUnit> GetNextUnit( + Queue* queue, CompileBaselineOnly baseline_only) { // As long as any lower-tier units are outstanding we need to steal them // before executing own higher-tier units. int max_tier = baseline_only ? kBaseline : kTopTier; for (int tier = GetLowestTierWithUnits(); tier <= max_tier; ++tier) { - if (auto unit = GetNextUnitOfTier(task_id, tier)) { + if (auto unit = GetNextUnitOfTier(queue, tier)) { size_t old_units_count = num_units_[tier].fetch_sub(1, std::memory_order_relaxed); DCHECK_LE(1, old_units_count); @@ -233,13 +193,18 @@ class CompilationUnitQueues { DCHECK_LT(0, baseline_units.size() + top_tier_units.size()); // Add to the individual queues in a round-robin fashion. No special care is // taken to balance them; they will be balanced by work stealing. - int queue_to_add = next_queue_to_add.load(std::memory_order_relaxed); - while (!next_queue_to_add.compare_exchange_weak( - queue_to_add, next_task_id(queue_to_add), std::memory_order_relaxed)) { - // Retry with updated {queue_to_add}. + QueueImpl* queue; + { + int queue_to_add = next_queue_to_add.load(std::memory_order_relaxed); + base::SharedMutexGuard<base::kShared> queues_guard(&queues_mutex_); + while (!next_queue_to_add.compare_exchange_weak( + queue_to_add, next_task_id(queue_to_add, queues_.size()), + std::memory_order_relaxed)) { + // Retry with updated {queue_to_add}. + } + queue = queues_[queue_to_add].get(); } - Queue* queue = &queues_[queue_to_add]; base::MutexGuard guard(&queue->mutex); base::Optional<base::MutexGuard> big_units_guard; for (auto pair : {std::make_pair(int{kBaseline}, baseline_units), @@ -265,22 +230,24 @@ class CompilationUnitQueues { } void AddTopTierPriorityUnit(WasmCompilationUnit unit, size_t priority) { + base::SharedMutexGuard<base::kShared> queues_guard(&queues_mutex_); // Add to the individual queues in a round-robin fashion. No special care is // taken to balance them; they will be balanced by work stealing. We use // the same counter for this reason. int queue_to_add = next_queue_to_add.load(std::memory_order_relaxed); while (!next_queue_to_add.compare_exchange_weak( - queue_to_add, next_task_id(queue_to_add), std::memory_order_relaxed)) { + queue_to_add, next_task_id(queue_to_add, queues_.size()), + std::memory_order_relaxed)) { // Retry with updated {queue_to_add}. } - TopTierPriorityUnitsQueue* queue = - &top_tier_priority_units_queues_[queue_to_add]; - base::MutexGuard guard(&queue->mutex); - + { + auto* queue = queues_[queue_to_add].get(); + base::MutexGuard guard(&queue->mutex); + queue->top_tier_priority_units.emplace(priority, unit); + } num_priority_units_.fetch_add(1, std::memory_order_relaxed); num_units_[kTopTier].fetch_add(1, std::memory_order_relaxed); - queue->units.emplace(priority, unit); } // Get the current total number of units in all queues. This is only a @@ -304,15 +271,6 @@ class CompilationUnitQueues { // order of their function body size. static constexpr size_t kBigUnitsLimit = 4096; - struct Queue { - base::Mutex mutex; - - // Protected by {mutex}: - std::vector<WasmCompilationUnit> units[kNumTiers]; - int next_steal_task_id; - // End of fields protected by {mutex}. - }; - struct BigUnit { BigUnit(size_t func_size, WasmCompilationUnit unit) : func_size{func_size}, unit(unit) {} @@ -351,28 +309,27 @@ class CompilationUnitQueues { std::priority_queue<BigUnit> units[kNumTiers]; }; - struct TopTierPriorityUnitsQueue { + struct QueueImpl : public Queue { + explicit QueueImpl(int next_steal_task_id) + : next_steal_task_id(next_steal_task_id) {} + + // Number of units after which the task processing this queue should publish + // compilation results. Updated (reduced, using relaxed ordering) when new + // queues are allocated. If there is only one thread running, we can delay + // publishing arbitrarily. + std::atomic<int> publish_limit{kMaxInt}; + base::Mutex mutex; - // Protected by {mutex}: - std::priority_queue<TopTierPriorityUnit> units; + // All fields below are protected by {mutex}. + std::vector<WasmCompilationUnit> units[kNumTiers]; + std::priority_queue<TopTierPriorityUnit> top_tier_priority_units; int next_steal_task_id; - // End of fields protected by {mutex}. }; - std::vector<Queue> queues_; - BigUnitsQueue big_units_queue_; - - std::vector<TopTierPriorityUnitsQueue> top_tier_priority_units_queues_; - - std::atomic<size_t> num_units_[kNumTiers]; - std::atomic<size_t> num_priority_units_{0}; - std::unique_ptr<std::atomic<bool>[]> treated_; - std::atomic<int> next_queue_to_add{0}; - - int next_task_id(int task_id) const { + int next_task_id(int task_id, size_t num_queues) const { int next = task_id + 1; - return next == static_cast<int>(queues_.size()) ? 0 : next; + return next == static_cast<int>(num_queues) ? 0 : next; } int GetLowestTierWithUnits() const { @@ -382,13 +339,13 @@ class CompilationUnitQueues { return kNumTiers; } - base::Optional<WasmCompilationUnit> GetNextUnitOfTier(int task_id, int tier) { - Queue* queue = &queues_[task_id]; + base::Optional<WasmCompilationUnit> GetNextUnitOfTier(Queue* public_queue, + int tier) { + QueueImpl* queue = static_cast<QueueImpl*>(public_queue); - // First check whether there is a priority unit. Execute that - // first. + // First check whether there is a priority unit. Execute that first. if (tier == kTopTier) { - if (auto unit = GetTopTierPriorityUnit(task_id)) { + if (auto unit = GetTopTierPriorityUnit(queue)) { return unit; } } @@ -411,12 +368,16 @@ class CompilationUnitQueues { // Try to steal from all other queues. If this succeeds, return one of the // stolen units. - size_t steal_trials = queues_.size(); - for (; steal_trials > 0; - --steal_trials, steal_task_id = next_task_id(steal_task_id)) { - if (steal_task_id == task_id) continue; - if (auto unit = StealUnitsAndGetFirst(task_id, steal_task_id, tier)) { - return unit; + { + base::SharedMutexGuard<base::kShared> guard(&queues_mutex_); + for (size_t steal_trials = 0; steal_trials < queues_.size(); + ++steal_trials, ++steal_task_id) { + if (steal_task_id >= static_cast<int>(queues_.size())) { + steal_task_id = 0; + } + if (auto unit = StealUnitsAndGetFirst(queue, steal_task_id, tier)) { + return unit; + } } } @@ -425,7 +386,7 @@ class CompilationUnitQueues { } base::Optional<WasmCompilationUnit> GetBigUnitOfTier(int tier) { - // Fast-path without locking. + // Fast path without locking. if (!big_units_queue_.has_units[tier].load(std::memory_order_relaxed)) { return {}; } @@ -439,25 +400,22 @@ class CompilationUnitQueues { return unit; } - base::Optional<WasmCompilationUnit> GetTopTierPriorityUnit(int task_id) { - // Fast-path without locking. + base::Optional<WasmCompilationUnit> GetTopTierPriorityUnit(QueueImpl* queue) { + // Fast path without locking. if (num_priority_units_.load(std::memory_order_relaxed) == 0) { return {}; } - TopTierPriorityUnitsQueue* queue = - &top_tier_priority_units_queues_[task_id]; - int steal_task_id; { base::MutexGuard mutex_guard(&queue->mutex); - while (!queue->units.empty()) { - auto unit = queue->units.top().unit; - queue->units.pop(); + while (!queue->top_tier_priority_units.empty()) { + auto unit = queue->top_tier_priority_units.top().unit; + queue->top_tier_priority_units.pop(); num_priority_units_.fetch_sub(1, std::memory_order_relaxed); - if (!treated_[unit.func_index()].exchange(true, - std::memory_order_relaxed)) { + if (!top_tier_compiled_[unit.func_index()].exchange( + true, std::memory_order_relaxed)) { return unit; } num_units_[kTopTier].fetch_sub(1, std::memory_order_relaxed); @@ -467,28 +425,34 @@ class CompilationUnitQueues { // Try to steal from all other queues. If this succeeds, return one of the // stolen units. - size_t steal_trials = queues_.size(); - for (; steal_trials > 0; - --steal_trials, steal_task_id = next_task_id(steal_task_id)) { - if (steal_task_id == task_id) continue; - if (auto unit = StealTopTierPriorityUnit(task_id, steal_task_id)) { - return unit; + { + base::SharedMutexGuard<base::kShared> guard(&queues_mutex_); + for (size_t steal_trials = 0; steal_trials < queues_.size(); + ++steal_trials, ++steal_task_id) { + if (steal_task_id >= static_cast<int>(queues_.size())) { + steal_task_id = 0; + } + if (auto unit = StealTopTierPriorityUnit(queue, steal_task_id)) { + return unit; + } } } return {}; } - // Steal units of {wanted_tier} from {steal_from_task_id} to {task_id}. Return + // Steal units of {wanted_tier} from {steal_from_task_id} to {queue}. Return // first stolen unit (rest put in queue of {task_id}), or {nullopt} if // {steal_from_task_id} had no units of {wanted_tier}. + // Hold a shared lock on {queues_mutex_} when calling this method. base::Optional<WasmCompilationUnit> StealUnitsAndGetFirst( - int task_id, int steal_from_task_id, int wanted_tier) { - DCHECK_NE(task_id, steal_from_task_id); + QueueImpl* queue, int steal_from_task_id, int wanted_tier) { + auto* steal_queue = queues_[steal_from_task_id].get(); + // Cannot steal from own queue. + if (steal_queue == queue) return {}; std::vector<WasmCompilationUnit> stolen; base::Optional<WasmCompilationUnit> returned_unit; { - Queue* steal_queue = &queues_[steal_from_task_id]; base::MutexGuard guard(&steal_queue->mutex); auto* steal_from_vector = &steal_queue->units[wanted_tier]; if (steal_from_vector->empty()) return {}; @@ -498,81 +462,65 @@ class CompilationUnitQueues { stolen.assign(steal_begin + 1, steal_from_vector->end()); steal_from_vector->erase(steal_begin, steal_from_vector->end()); } - Queue* queue = &queues_[task_id]; base::MutexGuard guard(&queue->mutex); auto* target_queue = &queue->units[wanted_tier]; target_queue->insert(target_queue->end(), stolen.begin(), stolen.end()); - queue->next_steal_task_id = next_task_id(steal_from_task_id); + queue->next_steal_task_id = steal_from_task_id + 1; return returned_unit; } // Steal one priority unit from {steal_from_task_id} to {task_id}. Return // stolen unit, or {nullopt} if {steal_from_task_id} had no priority units. + // Hold a shared lock on {queues_mutex_} when calling this method. base::Optional<WasmCompilationUnit> StealTopTierPriorityUnit( - int task_id, int steal_from_task_id) { - DCHECK_NE(task_id, steal_from_task_id); - + QueueImpl* queue, int steal_from_task_id) { + auto* steal_queue = queues_[steal_from_task_id].get(); + // Cannot steal from own queue. + if (steal_queue == queue) return {}; base::Optional<WasmCompilationUnit> returned_unit; { - TopTierPriorityUnitsQueue* steal_queue = - &top_tier_priority_units_queues_[steal_from_task_id]; base::MutexGuard guard(&steal_queue->mutex); while (true) { - if (steal_queue->units.empty()) return {}; + if (steal_queue->top_tier_priority_units.empty()) return {}; - auto unit = steal_queue->units.top().unit; - steal_queue->units.pop(); + auto unit = steal_queue->top_tier_priority_units.top().unit; + steal_queue->top_tier_priority_units.pop(); num_priority_units_.fetch_sub(1, std::memory_order_relaxed); - if (!treated_[unit.func_index()].exchange(true, - std::memory_order_relaxed)) { + if (!top_tier_compiled_[unit.func_index()].exchange( + true, std::memory_order_relaxed)) { returned_unit = unit; break; } num_units_[kTopTier].fetch_sub(1, std::memory_order_relaxed); } } - TopTierPriorityUnitsQueue* queue = - &top_tier_priority_units_queues_[task_id]; base::MutexGuard guard(&queue->mutex); - queue->next_steal_task_id = next_task_id(steal_from_task_id); + queue->next_steal_task_id = steal_from_task_id + 1; return returned_unit; } -}; - -// {JobHandle} is not thread safe in general (at least both the -// {DefaultJobHandle} and chromium's {base::JobHandle} are not). Hence, protect -// concurrent accesses via a mutex. -class ThreadSafeJobHandle { - public: - explicit ThreadSafeJobHandle(std::shared_ptr<JobHandle> job_handle) - : job_handle_(std::move(job_handle)) {} - void NotifyConcurrencyIncrease() { - base::MutexGuard guard(&mutex_); - job_handle_->NotifyConcurrencyIncrease(); - } + // {queues_mutex_} protectes {queues_}; + base::SharedMutex queues_mutex_; + std::vector<std::unique_ptr<QueueImpl>> queues_; - void Join() { - base::MutexGuard guard(&mutex_); - job_handle_->Join(); - } + const int num_declared_functions_; - void Cancel() { - base::MutexGuard guard(&mutex_); - job_handle_->Cancel(); - } - - bool IsRunning() const { - base::MutexGuard guard(&mutex_); - return job_handle_->IsRunning(); - } + BigUnitsQueue big_units_queue_; - private: - mutable base::Mutex mutex_; - std::shared_ptr<JobHandle> job_handle_; + std::atomic<size_t> num_units_[kNumTiers]; + std::atomic<size_t> num_priority_units_{0}; + std::unique_ptr<std::atomic<bool>[]> top_tier_compiled_; + std::atomic<int> next_queue_to_add{0}; }; +bool CompilationUnitQueues::Queue::ShouldPublish( + int num_processed_units) const { + auto* queue = static_cast<const QueueImpl*>(this); + return num_processed_units >= + queue->publish_limit.load(std::memory_order_relaxed); +} + // The {CompilationStateImpl} keeps track of the compilation state of the // owning NativeModule, i.e. which functions are left to be compiled. // It contains a task manager to allow parallel and asynchronous background @@ -586,6 +534,7 @@ class CompilationStateImpl { // Cancel all background compilation, without waiting for compile tasks to // finish. void CancelCompilation(); + bool cancelled() const; // Initialize compilation progress. Set compilation tiers to expect for // baseline and top tier compilation. Must be set before {AddCompilationUnits} @@ -618,8 +567,11 @@ class CompilationStateImpl { js_to_wasm_wrapper_units); void AddTopTierCompilationUnit(WasmCompilationUnit); void AddTopTierPriorityCompilationUnit(WasmCompilationUnit, size_t); + + CompilationUnitQueues::Queue* GetQueueForCompileTask(int task_id); + base::Optional<WasmCompilationUnit> GetNextCompilationUnit( - int task_id, CompileBaselineOnly baseline_only); + CompilationUnitQueues::Queue*, CompileBaselineOnly); std::shared_ptr<JSToWasmWrapperCompilationUnit> GetNextJSToWasmWrapperCompilationUnit(); @@ -629,13 +581,13 @@ class CompilationStateImpl { void OnFinishedUnits(Vector<WasmCode*>); void OnFinishedJSToWasmWrapperUnits(int num); - int GetFreeCompileTaskId(); - int GetUnpublishedUnitsLimits(int task_id); - void OnCompilationStopped(int task_id, const WasmFeatures& detected); + void OnCompilationStopped(const WasmFeatures& detected); void PublishDetectedFeatures(Isolate*); + void SchedulePublishCompilationResults( + std::vector<std::unique_ptr<WasmCode>> unpublished_code); // Ensure that a compilation job is running, and increase its concurrency if // needed. - void ScheduleCompileJobForNewUnits(int new_units); + void ScheduleCompileJobForNewUnits(); size_t NumOutstandingCompilations() const; @@ -687,8 +639,12 @@ class CompilationStateImpl { // Hold the {callbacks_mutex_} when calling this method. void TriggerCallbacks(base::EnumSet<CompilationEvent> additional_events = {}); + void PublishCompilationResults( + std::vector<std::unique_ptr<WasmCode>> unpublished_code); + void PublishCode(Vector<std::unique_ptr<WasmCode>> codes); + NativeModule* const native_module_; - const std::shared_ptr<BackgroundCompileToken> background_compile_token_; + std::weak_ptr<NativeModule> const native_module_weak_; const CompileMode compile_mode_; const std::shared_ptr<Counters> async_counters_; @@ -696,20 +652,9 @@ class CompilationStateImpl { // using relaxed semantics. std::atomic<bool> compile_failed_{false}; - // The atomic counter is shared with the compilation job. It's increased if - // more units are added, and decreased when the queue drops to zero. Hence - // it's an approximation of the current number of available units in the - // queue, but it's not updated after popping a single unit, because that - // would create too much contention. - // This counter is not used for synchronization, hence relaxed memory ordering - // can be used. The thread that increases the counter is the same that calls - // {NotifyConcurrencyIncrease} later. The only reduction of the counter is a - // drop to zero after a worker does not find any unit in the queue, and after - // that drop another check is executed to ensure that any left-over units are - // still processed. - std::shared_ptr<std::atomic<int>> scheduled_units_approximation_ = - std::make_shared<std::atomic<int>>(0); - const int max_compile_concurrency_ = 0; + // True if compilation was cancelled and worker threads should return. This + // flag can be updated and read using relaxed semantics. + std::atomic<bool> compile_cancelled_{false}; CompilationUnitQueues compilation_unit_queues_; @@ -729,7 +674,7 @@ class CompilationStateImpl { ////////////////////////////////////////////////////////////////////////////// // Protected by {mutex_}: - std::shared_ptr<ThreadSafeJobHandle> current_compile_job_; + std::shared_ptr<JobHandle> current_compile_job_; // Features detected to be used in this module. Features can be detected // as a module is being compiled. @@ -768,6 +713,11 @@ class CompilationStateImpl { // End of fields protected by {callbacks_mutex_}. ////////////////////////////////////////////////////////////////////////////// + // {publish_mutex_} protects {publish_queue_} and {publisher_running_}. + base::Mutex publish_mutex_; + std::vector<std::unique_ptr<WasmCode>> publish_queue_; + bool publisher_running_ = false; + // Encoding of fields in the {compilation_progress_} vector. using RequiredBaselineTierField = base::BitField8<ExecutionTier, 0, 2>; using RequiredTopTierField = base::BitField8<ExecutionTier, 2, 2>; @@ -782,21 +732,14 @@ const CompilationStateImpl* Impl(const CompilationState* compilation_state) { return reinterpret_cast<const CompilationStateImpl*>(compilation_state); } -CompilationStateImpl* BackgroundCompileScope::compilation_state() { - return Impl(native_module()->compilation_state()); +CompilationStateImpl* BackgroundCompileScope::compilation_state() const { + DCHECK(native_module_); + return Impl(native_module_->compilation_state()); } -void BackgroundCompileToken::PublishCode( - NativeModule* native_module, Vector<std::unique_ptr<WasmCode>> code) { - WasmCodeRefScope code_ref_scope; - std::vector<WasmCode*> published_code = native_module->PublishCode(code); - // Defer logging code in case wire bytes were not fully received yet. - if (native_module->HasWireBytes()) { - native_module->engine()->LogCode(VectorOf(published_code)); - } - - Impl(native_module->compilation_state()) - ->OnFinishedUnits(VectorOf(published_code)); +bool BackgroundCompileScope::cancelled() const { + return native_module_ == nullptr || + Impl(native_module_->compilation_state())->cancelled(); } void UpdateFeatureUseCounts(Isolate* isolate, const WasmFeatures& detected) { @@ -877,8 +820,9 @@ bool CompilationState::recompilation_finished() const { std::unique_ptr<CompilationState> CompilationState::New( const std::shared_ptr<NativeModule>& native_module, std::shared_ptr<Counters> async_counters) { - return std::unique_ptr<CompilationState>(reinterpret_cast<CompilationState*>( - new CompilationStateImpl(native_module, std::move(async_counters)))); + return std::unique_ptr<CompilationState>( + reinterpret_cast<CompilationState*>(new CompilationStateImpl( + std::move(native_module), std::move(async_counters)))); } // End of PIMPL implementation of {CompilationState}. @@ -1215,31 +1159,31 @@ void TriggerTierUp(Isolate* isolate, NativeModule* native_module, namespace { void RecordStats(const Code code, Counters* counters) { - counters->wasm_generated_code_size()->Increment(code.body_size()); + counters->wasm_generated_code_size()->Increment(code.raw_body_size()); counters->wasm_reloc_size()->Increment(code.relocation_info().length()); } enum CompilationExecutionResult : int8_t { kNoMoreUnits, kYield }; CompilationExecutionResult ExecuteJSToWasmWrapperCompilationUnits( - const std::shared_ptr<BackgroundCompileToken>& token, - JobDelegate* delegate) { + std::weak_ptr<NativeModule> native_module, JobDelegate* delegate) { std::shared_ptr<JSToWasmWrapperCompilationUnit> wrapper_unit = nullptr; int num_processed_wrappers = 0; { - BackgroundCompileScope compile_scope(token); + BackgroundCompileScope compile_scope(native_module); if (compile_scope.cancelled()) return kNoMoreUnits; wrapper_unit = compile_scope.compilation_state() ->GetNextJSToWasmWrapperCompilationUnit(); if (!wrapper_unit) return kNoMoreUnits; } + TRACE_EVENT0("v8.wasm", "wasm.JSToWasmWrapperCompilation"); while (true) { wrapper_unit->Execute(); ++num_processed_wrappers; bool yield = delegate && delegate->ShouldYield(); - BackgroundCompileScope compile_scope(token); + BackgroundCompileScope compile_scope(native_module); if (compile_scope.cancelled()) return kNoMoreUnits; if (yield || !(wrapper_unit = compile_scope.compilation_state() @@ -1251,16 +1195,35 @@ CompilationExecutionResult ExecuteJSToWasmWrapperCompilationUnits( } } +namespace { +const char* GetCompilationEventName(const WasmCompilationUnit& unit, + const CompilationEnv& env) { + ExecutionTier tier = unit.tier(); + if (tier == ExecutionTier::kLiftoff) { + return "wasm.BaselineCompilation"; + } + if (tier == ExecutionTier::kTurbofan) { + return "wasm.TopTierCompilation"; + } + if (unit.func_index() < + static_cast<int>(env.module->num_imported_functions)) { + return "wasm.WasmToJSWrapperCompilation"; + } + return "wasm.OtherCompilation"; +} +} // namespace + // Run by the {BackgroundCompileJob} (on any thread). CompilationExecutionResult ExecuteCompilationUnits( - const std::shared_ptr<BackgroundCompileToken>& token, Counters* counters, + std::weak_ptr<NativeModule> native_module, Counters* counters, JobDelegate* delegate, CompileBaselineOnly baseline_only) { TRACE_EVENT0("v8.wasm", "wasm.ExecuteCompilationUnits"); // Execute JS to Wasm wrapper units first, so that they are ready to be // finalized by the main thread when the kFinishedBaselineCompilation event is // triggered. - if (ExecuteJSToWasmWrapperCompilationUnits(token, delegate) == kYield) { + if (ExecuteJSToWasmWrapperCompilationUnits(native_module, delegate) == + kYield) { return kYield; } @@ -1270,108 +1233,65 @@ CompilationExecutionResult ExecuteCompilationUnits( std::shared_ptr<WireBytesStorage> wire_bytes; std::shared_ptr<const WasmModule> module; WasmEngine* wasm_engine; - // The Jobs API guarantees that {GetTaskId} is less than the number of - // workers, and that the number of workers is less than or equal to the max - // compile concurrency, which makes the task_id safe to use as an index into - // the worker queues. - int task_id = delegate ? delegate->GetTaskId() : 0; - int unpublished_units_limit; + // Task 0 is any main thread (there might be multiple from multiple isolates), + // worker threads start at 1 (thus the "+ 1"). + int task_id = delegate ? (int{delegate->GetTaskId()} + 1) : 0; + DCHECK_LE(0, task_id); + CompilationUnitQueues::Queue* queue; base::Optional<WasmCompilationUnit> unit; WasmFeatures detected_features = WasmFeatures::None(); - auto stop = [&detected_features, - task_id](BackgroundCompileScope& compile_scope) { - compile_scope.compilation_state()->OnCompilationStopped(task_id, - detected_features); - }; - // Preparation (synchronized): Initialize the fields above and get the first // compilation unit. { - BackgroundCompileScope compile_scope(token); + BackgroundCompileScope compile_scope(native_module); if (compile_scope.cancelled()) return kNoMoreUnits; auto* compilation_state = compile_scope.compilation_state(); env.emplace(compile_scope.native_module()->CreateCompilationEnv()); wire_bytes = compilation_state->GetWireBytesStorage(); module = compile_scope.native_module()->shared_module(); wasm_engine = compile_scope.native_module()->engine(); - unpublished_units_limit = - compilation_state->GetUnpublishedUnitsLimits(task_id); - unit = compilation_state->GetNextCompilationUnit(task_id, baseline_only); - if (!unit) { - stop(compile_scope); - return kNoMoreUnits; - } + queue = compilation_state->GetQueueForCompileTask(task_id); + unit = compilation_state->GetNextCompilationUnit(queue, baseline_only); + if (!unit) return kNoMoreUnits; } TRACE_COMPILE("ExecuteCompilationUnits (task id %d)\n", task_id); std::vector<WasmCompilationResult> results_to_publish; - - auto publish_results = [&results_to_publish]( - BackgroundCompileScope* compile_scope) { - TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("v8.wasm.detailed"), - "wasm.PublishCompilationResults", "num_results", - results_to_publish.size()); - if (results_to_publish.empty()) return; - std::vector<std::unique_ptr<WasmCode>> unpublished_code = - compile_scope->native_module()->AddCompiledCode( - VectorOf(results_to_publish)); - results_to_publish.clear(); - - // For import wrapper compilation units, add result to the cache. - const NativeModule* native_module = compile_scope->native_module(); - int num_imported_functions = native_module->num_imported_functions(); - WasmImportWrapperCache* cache = native_module->import_wrapper_cache(); - for (const auto& code : unpublished_code) { - int func_index = code->index(); - DCHECK_LE(0, func_index); - DCHECK_LT(func_index, native_module->num_functions()); - if (func_index < num_imported_functions) { - const FunctionSig* sig = - native_module->module()->functions[func_index].sig; - WasmImportWrapperCache::CacheKey key( - compiler::kDefaultImportCallKind, sig, - static_cast<int>(sig->parameter_count())); - // If two imported functions have the same key, only one of them should - // have been added as a compilation unit. So it is always the first time - // we compile a wrapper for this key here. - DCHECK_NULL((*cache)[key]); - (*cache)[key] = code.get(); - code->IncRef(); - } - } - - compile_scope->SchedulePublishCode(std::move(unpublished_code)); - }; - - bool compilation_failed = false; while (true) { - // (asynchronous): Execute the compilation. - WasmCompilationResult result = unit->ExecuteCompilation( - wasm_engine, &env.value(), wire_bytes, counters, &detected_features); - results_to_publish.emplace_back(std::move(result)); - - bool yield = delegate && delegate->ShouldYield(); - - // (synchronized): Publish the compilation result and get the next unit. - { - BackgroundCompileScope compile_scope(token); + ExecutionTier current_tier = unit->tier(); + const char* event_name = GetCompilationEventName(unit.value(), env.value()); + TRACE_EVENT0("v8.wasm", event_name); + while (unit->tier() == current_tier) { + // (asynchronous): Execute the compilation. + WasmCompilationResult result = unit->ExecuteCompilation( + wasm_engine, &env.value(), wire_bytes, counters, &detected_features); + results_to_publish.emplace_back(std::move(result)); + + bool yield = delegate && delegate->ShouldYield(); + + // (synchronized): Publish the compilation result and get the next unit. + BackgroundCompileScope compile_scope(native_module); if (compile_scope.cancelled()) return kNoMoreUnits; + if (!results_to_publish.back().succeeded()) { - // Compile error. compile_scope.compilation_state()->SetError(); - stop(compile_scope); - compilation_failed = true; - break; + return kNoMoreUnits; } - // Get next unit. + // Yield or get next unit. if (yield || !(unit = compile_scope.compilation_state()->GetNextCompilationUnit( - task_id, baseline_only))) { - publish_results(&compile_scope); - stop(compile_scope); + queue, baseline_only))) { + std::vector<std::unique_ptr<WasmCode>> unpublished_code = + compile_scope.native_module()->AddCompiledCode( + VectorOf(std::move(results_to_publish))); + results_to_publish.clear(); + compile_scope.compilation_state()->SchedulePublishCompilationResults( + std::move(unpublished_code)); + compile_scope.compilation_state()->OnCompilationStopped( + detected_features); return yield ? kYield : kNoMoreUnits; } @@ -1382,17 +1302,17 @@ CompilationExecutionResult ExecuteCompilationUnits( // Also publish after finishing a certain amount of units, to avoid // contention when all threads publish at the end. if (unit->tier() == ExecutionTier::kTurbofan || - static_cast<int>(results_to_publish.size()) >= - unpublished_units_limit) { - publish_results(&compile_scope); + queue->ShouldPublish(static_cast<int>(results_to_publish.size()))) { + std::vector<std::unique_ptr<WasmCode>> unpublished_code = + compile_scope.native_module()->AddCompiledCode( + VectorOf(std::move(results_to_publish))); + results_to_publish.clear(); + compile_scope.compilation_state()->SchedulePublishCompilationResults( + std::move(unpublished_code)); } } } - // We only get here if compilation failed. Other exits return directly. - DCHECK(compilation_failed); - USE(compilation_failed); - token->Cancel(); - return kNoMoreUnits; + UNREACHABLE(); } using JSToWasmWrapperKey = std::pair<bool, FunctionSig>; @@ -1410,7 +1330,8 @@ int AddExportWrapperUnits(Isolate* isolate, WasmEngine* wasm_engine, if (keys.insert(key).second) { auto unit = std::make_shared<JSToWasmWrapperCompilationUnit>( isolate, wasm_engine, function.sig, native_module->module(), - function.imported, enabled_features); + function.imported, enabled_features, + JSToWasmWrapperCompilationUnit::kAllowGeneric); builder->AddJSToWasmWrapperUnit(std::move(unit)); } } @@ -1529,6 +1450,7 @@ class CompilationTimeCallback { histogram->AddSample(static_cast<int>(duration.InMicroseconds())); } + // TODO(sartang@microsoft.com): Remove wall_clock_time_in_us field v8::metrics::WasmModuleCompiled event{ (compile_mode_ != kSynchronous), // async (compile_mode_ == kStreaming), // streamed @@ -1538,7 +1460,8 @@ class CompilationTimeCallback { true, // success native_module->liftoff_code_size(), // code_size_in_bytes native_module->liftoff_bailout_count(), // liftoff_bailout_count - duration.InMicroseconds() // wall_clock_time_in_us + duration.InMicroseconds(), // wall_clock_time_in_us + duration.InMicroseconds() // wall_clock_duration_in_us }; metrics_recorder_->DelayMainThreadEvent(event, context_id_); } @@ -1549,7 +1472,8 @@ class CompilationTimeCallback { v8::metrics::WasmModuleTieredUp event{ FLAG_wasm_lazy_compilation, // lazy native_module->turbofan_code_size(), // code_size_in_bytes - duration.InMicroseconds() // wall_clock_time_in_us + duration.InMicroseconds(), // wall_clock_time_in_us + duration.InMicroseconds() // wall_clock_duration_in_us }; metrics_recorder_->DelayMainThreadEvent(event, context_id_); } @@ -1563,7 +1487,8 @@ class CompilationTimeCallback { false, // success native_module->liftoff_code_size(), // code_size_in_bytes native_module->liftoff_bailout_count(), // liftoff_bailout_count - duration.InMicroseconds() // wall_clock_time_in_us + duration.InMicroseconds(), // wall_clock_time_in_us + duration.InMicroseconds() // wall_clock_duration_in_us }; metrics_recorder_->DelayMainThreadEvent(event, context_id_); } @@ -1646,55 +1571,33 @@ void CompileNativeModule(Isolate* isolate, } } -// The runnable task that performs compilations in the background. -class BackgroundCompileJob : public JobTask { +class BackgroundCompileJob final : public JobTask { public: - explicit BackgroundCompileJob( - std::shared_ptr<BackgroundCompileToken> token, - std::shared_ptr<Counters> async_counters, - std::shared_ptr<std::atomic<int>> scheduled_units_approximation, - size_t max_concurrency) - : token_(std::move(token)), - async_counters_(std::move(async_counters)), - scheduled_units_approximation_( - std::move(scheduled_units_approximation)), - max_concurrency_(max_concurrency) {} + explicit BackgroundCompileJob(std::weak_ptr<NativeModule> native_module, + std::shared_ptr<Counters> async_counters) + : native_module_(std::move(native_module)), + async_counters_(std::move(async_counters)) {} void Run(JobDelegate* delegate) override { - if (ExecuteCompilationUnits(token_, async_counters_.get(), delegate, - kBaselineOrTopTier) == kYield) { - return; - } - // Otherwise we didn't find any more units to execute. Reduce the atomic - // counter of the approximated number of available units to zero, but then - // check whether any more units were added in the meantime, and increase - // back if necessary. - scheduled_units_approximation_->store(0, std::memory_order_relaxed); - - BackgroundCompileScope scope(token_); - if (scope.cancelled()) return; - size_t outstanding_units = - scope.compilation_state()->NumOutstandingCompilations(); - if (outstanding_units == 0) return; - // On a race between this thread and the thread which scheduled the units, - // this might increase concurrency more than needed, which is fine. It - // will be reduced again when the first task finds no more work to do. - scope.compilation_state()->ScheduleCompileJobForNewUnits( - static_cast<int>(outstanding_units)); + ExecuteCompilationUnits(native_module_, async_counters_.get(), delegate, + kBaselineOrTopTier); } size_t GetMaxConcurrency(size_t worker_count) const override { - // {current_concurrency_} does not reflect the units that running workers - // are processing, thus add the current worker count to that number. - return std::min(max_concurrency_, - worker_count + scheduled_units_approximation_->load()); + BackgroundCompileScope scope(native_module_); + if (scope.cancelled()) return 0; + // NumOutstandingCompilations() does not reflect the units that running + // workers are processing, thus add the current worker count to that number. + size_t flag_limit = + static_cast<size_t>(std::max(1, FLAG_wasm_num_compilation_tasks)); + return std::min( + flag_limit, + worker_count + scope.compilation_state()->NumOutstandingCompilations()); } private: - const std::shared_ptr<BackgroundCompileToken> token_; + const std::weak_ptr<NativeModule> native_module_; const std::shared_ptr<Counters> async_counters_; - const std::shared_ptr<std::atomic<int>> scheduled_units_approximation_; - const size_t max_concurrency_; }; } // namespace @@ -1974,7 +1877,8 @@ void AsyncCompileJob::FinishCompile(bool is_after_cache_hit) { !compilation_state->failed(), // success native_module_->liftoff_code_size(), // code_size_in_bytes native_module_->liftoff_bailout_count(), // liftoff_bailout_count - duration.InMicroseconds() // wall_clock_time_in_us + duration.InMicroseconds(), // wall_clock_time_in_us + duration.InMicroseconds() // wall_clock_duration_in_us }; isolate_->metrics_recorder()->DelayMainThreadEvent(event, context_id_); } @@ -2489,6 +2393,7 @@ void AsyncStreamingProcessor::FinishAsyncCompileJobWithError( job_->metrics_event_.module_size_in_bytes = job_->wire_bytes_.length(); job_->metrics_event_.function_count = num_functions_; job_->metrics_event_.wall_clock_time_in_us = duration.InMicroseconds(); + job_->metrics_event_.wall_clock_duration_in_us = duration.InMicroseconds(); job_->isolate_->metrics_recorder()->DelayMainThreadEvent(job_->metrics_event_, job_->context_id_); @@ -2580,6 +2485,8 @@ bool AsyncStreamingProcessor::ProcessCodeSectionHeader( return false; } + decoder_.set_code_section(offset, static_cast<uint32_t>(code_section_length)); + prefix_hash_ = base::hash_combine(prefix_hash_, static_cast<uint32_t>(code_section_length)); if (!wasm_engine_->GetStreamingCompilationOwnership(prefix_hash_)) { @@ -2601,7 +2508,6 @@ bool AsyncStreamingProcessor::ProcessCodeSectionHeader( job_->DoImmediately<AsyncCompileJob::PrepareAndStartCompile>( decoder_.shared_module(), false, code_size_estimate); - decoder_.set_code_section(offset, static_cast<uint32_t>(code_section_length)); auto* compilation_state = Impl(job_->native_module_->compilation_state()); compilation_state->SetWireBytesStorage(std::move(wire_bytes_storage)); DCHECK_EQ(job_->native_module_->module()->origin, kWasmOrigin); @@ -2710,6 +2616,7 @@ void AsyncStreamingProcessor::OnFinishedStream(OwnedVector<uint8_t> bytes) { job_->metrics_event_.module_size_in_bytes = job_->wire_bytes_.length(); job_->metrics_event_.function_count = num_functions_; job_->metrics_event_.wall_clock_time_in_us = duration.InMicroseconds(); + job_->metrics_event_.wall_clock_duration_in_us = duration.InMicroseconds(); job_->isolate_->metrics_recorder()->DelayMainThreadEvent(job_->metrics_event_, job_->context_id_); @@ -2804,37 +2711,31 @@ bool AsyncStreamingProcessor::Deserialize(Vector<const uint8_t> module_bytes, return true; } -// TODO(wasm): Try to avoid the {NumberOfWorkerThreads} calls, grow queues -// dynamically instead. -int GetMaxCompileConcurrency() { - int num_worker_threads = V8::GetCurrentPlatform()->NumberOfWorkerThreads(); - return std::min(FLAG_wasm_num_compilation_tasks, num_worker_threads); -} - CompilationStateImpl::CompilationStateImpl( const std::shared_ptr<NativeModule>& native_module, std::shared_ptr<Counters> async_counters) : native_module_(native_module.get()), - background_compile_token_( - std::make_shared<BackgroundCompileToken>(native_module)), + native_module_weak_(std::move(native_module)), compile_mode_(FLAG_wasm_tier_up && native_module->module()->origin == kWasmOrigin ? CompileMode::kTiering : CompileMode::kRegular), async_counters_(std::move(async_counters)), - max_compile_concurrency_(std::max(GetMaxCompileConcurrency(), 1)), - // Add one to the allowed number of parallel tasks, because the foreground - // task sometimes also contributes. - compilation_unit_queues_(max_compile_concurrency_ + 1, - native_module->num_functions()) {} + compilation_unit_queues_(native_module->num_functions()) {} void CompilationStateImpl::CancelCompilation() { - background_compile_token_->Cancel(); // No more callbacks after abort. base::MutexGuard callbacks_guard(&callbacks_mutex_); + // std::memory_order_relaxed is sufficient because no other state is + // synchronized with |compile_cancelled_|. + compile_cancelled_.store(true, std::memory_order_relaxed); callbacks_.clear(); } +bool CompilationStateImpl::cancelled() const { + return compile_cancelled_.load(std::memory_order_relaxed); +} + void CompilationStateImpl::InitializeCompilationProgress( bool lazy_module, int num_import_wrappers, int num_export_wrappers) { DCHECK(!failed()); @@ -2909,6 +2810,9 @@ void CompilationStateImpl::InitializeCompilationProgressAfterDeserialization() { RequiredBaselineTierField::encode(ExecutionTier::kTurbofan) | RequiredTopTierField::encode(ExecutionTier::kTurbofan) | ReachedTierField::encode(ExecutionTier::kTurbofan); + finished_events_.Add(CompilationEvent::kFinishedExportWrappers); + finished_events_.Add(CompilationEvent::kFinishedBaselineCompilation); + finished_events_.Add(CompilationEvent::kFinishedTopTierCompilation); compilation_progress_.assign(module->num_declared_functions, kProgressAfterDeserialization); } @@ -2956,7 +2860,9 @@ void CompilationStateImpl::InitializeRecompilation( // start yet, and new code will be kept tiered-down from the start. For // streaming compilation, there is a special path to tier down later, when // the module is complete. In any case, we don't need to recompile here. + base::Optional<CompilationUnitBuilder> builder; if (compilation_progress_.size() > 0) { + builder.emplace(native_module_); const WasmModule* module = native_module_->module(); DCHECK_EQ(module->num_declared_functions, compilation_progress_.size()); DCHECK_GE(module->num_declared_functions, @@ -2971,15 +2877,13 @@ void CompilationStateImpl::InitializeRecompilation( : ExecutionTier::kTurbofan; int imported = module->num_imported_functions; // Generate necessary compilation units on the fly. - CompilationUnitBuilder builder(native_module_); for (int function_index : recompile_function_indexes) { DCHECK_LE(imported, function_index); int slot_index = function_index - imported; auto& progress = compilation_progress_[slot_index]; progress = MissingRecompilationField::update(progress, true); - builder.AddRecompilationUnit(function_index, new_tier); + builder->AddRecompilationUnit(function_index, new_tier); } - builder.Commit(); } // Trigger callback if module needs no recompilation. @@ -2987,6 +2891,12 @@ void CompilationStateImpl::InitializeRecompilation( TriggerCallbacks(base::EnumSet<CompilationEvent>( {CompilationEvent::kFinishedRecompilation})); } + + if (builder.has_value()) { + // Avoid holding lock while scheduling a compile job. + guard.reset(); + builder->Commit(); + } } void CompilationStateImpl::AddCallback(CompilationState::callback_t callback) { @@ -3017,13 +2927,15 @@ void CompilationStateImpl::AddCompilationUnits( compilation_unit_queues_.AddUnits(baseline_units, top_tier_units, native_module_->module()); } - js_to_wasm_wrapper_units_.insert(js_to_wasm_wrapper_units_.end(), - js_to_wasm_wrapper_units.begin(), - js_to_wasm_wrapper_units.end()); - - size_t total_units = baseline_units.size() + top_tier_units.size() + - js_to_wasm_wrapper_units.size(); - ScheduleCompileJobForNewUnits(static_cast<int>(total_units)); + if (!js_to_wasm_wrapper_units.empty()) { + // |js_to_wasm_wrapper_units_| can only be modified before background + // compilation started. + DCHECK(!current_compile_job_ || !current_compile_job_->IsRunning()); + js_to_wasm_wrapper_units_.insert(js_to_wasm_wrapper_units_.end(), + js_to_wasm_wrapper_units.begin(), + js_to_wasm_wrapper_units.end()); + } + ScheduleCompileJobForNewUnits(); } void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) { @@ -3033,7 +2945,7 @@ void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) { void CompilationStateImpl::AddTopTierPriorityCompilationUnit( WasmCompilationUnit unit, size_t priority) { compilation_unit_queues_.AddTopTierPriorityUnit(unit, priority); - ScheduleCompileJobForNewUnits(1); + ScheduleCompileJobForNewUnits(); } std::shared_ptr<JSToWasmWrapperCompilationUnit> @@ -3055,7 +2967,7 @@ void CompilationStateImpl::FinalizeJSToWasmWrappers( // optimization we keep the code space unlocked to avoid repeated unlocking // because many such wrapper are allocated in sequence below. TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("v8.wasm.detailed"), - "wasm.FinalizeJSToWasmWrappers", "num_wrappers", + "wasm.FinalizeJSToWasmWrappers", "wrappers", js_to_wasm_wrapper_units_.size()); CodeSpaceMemoryModificationScope modification_scope(isolate->heap()); for (auto& unit : js_to_wasm_wrapper_units_) { @@ -3067,15 +2979,20 @@ void CompilationStateImpl::FinalizeJSToWasmWrappers( } } +CompilationUnitQueues::Queue* CompilationStateImpl::GetQueueForCompileTask( + int task_id) { + return compilation_unit_queues_.GetQueueForTask(task_id); +} + base::Optional<WasmCompilationUnit> CompilationStateImpl::GetNextCompilationUnit( - int task_id, CompileBaselineOnly baseline_only) { - return compilation_unit_queues_.GetNextUnit(task_id, baseline_only); + CompilationUnitQueues::Queue* queue, CompileBaselineOnly baseline_only) { + return compilation_unit_queues_.GetNextUnit(queue, baseline_only); } void CompilationStateImpl::OnFinishedUnits(Vector<WasmCode*> code_vector) { TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("v8.wasm.detailed"), - "wasm.OnFinishedUnits", "num_units", code_vector.size()); + "wasm.OnFinishedUnits", "units", code_vector.size()); base::MutexGuard guard(&callbacks_mutex_); @@ -3230,24 +3147,7 @@ void CompilationStateImpl::TriggerCallbacks( } } -int CompilationStateImpl::GetUnpublishedUnitsLimits(int task_id) { - // We want background threads to publish regularly (to avoid contention when - // they are all publishing at the end). On the other side, each publishing has - // some overhead (part of it for synchronizing between threads), so it should - // not happen *too* often. - // Thus aim for 4-8 publishes per thread, but distribute it such that - // publishing is likely to happen at different times. - int units_per_thread = - static_cast<int>(native_module_->module()->num_declared_functions / - max_compile_concurrency_); - int min = units_per_thread / 8; - // Return something between {min} and {2*min}, but not smaller than {10}. - return std::max(10, min + (min * task_id / max_compile_concurrency_)); -} - -void CompilationStateImpl::OnCompilationStopped(int task_id, - const WasmFeatures& detected) { - DCHECK_GE(max_compile_concurrency_, task_id); +void CompilationStateImpl::OnCompilationStopped(const WasmFeatures& detected) { base::MutexGuard guard(&mutex_); detected_features_.Add(detected); } @@ -3260,40 +3160,104 @@ void CompilationStateImpl::PublishDetectedFeatures(Isolate* isolate) { UpdateFeatureUseCounts(isolate, detected_features_); } -void CompilationStateImpl::ScheduleCompileJobForNewUnits(int new_units) { - // Increase the {scheduled_units_approximation_} counter and remember the old - // value to check whether it increased towards {max_compile_concurrency_}. - // In that case, we need to notify the compile job about the increased - // concurrency. - DCHECK_LT(0, new_units); - int old_units = scheduled_units_approximation_->fetch_add( - new_units, std::memory_order_relaxed); - bool concurrency_increased = old_units < max_compile_concurrency_; +void CompilationStateImpl::PublishCompilationResults( + std::vector<std::unique_ptr<WasmCode>> unpublished_code) { + if (unpublished_code.empty()) return; - base::MutexGuard guard(&mutex_); - if (current_compile_job_ && current_compile_job_->IsRunning()) { - if (concurrency_increased) { - current_compile_job_->NotifyConcurrencyIncrease(); + // For import wrapper compilation units, add result to the cache. + int num_imported_functions = native_module_->num_imported_functions(); + WasmImportWrapperCache* cache = native_module_->import_wrapper_cache(); + for (const auto& code : unpublished_code) { + int func_index = code->index(); + DCHECK_LE(0, func_index); + DCHECK_LT(func_index, native_module_->num_functions()); + if (func_index < num_imported_functions) { + const FunctionSig* sig = + native_module_->module()->functions[func_index].sig; + WasmImportWrapperCache::CacheKey key( + compiler::kDefaultImportCallKind, sig, + static_cast<int>(sig->parameter_count())); + // If two imported functions have the same key, only one of them should + // have been added as a compilation unit. So it is always the first time + // we compile a wrapper for this key here. + DCHECK_NULL((*cache)[key]); + (*cache)[key] = code.get(); + code->IncRef(); } - return; } + PublishCode(VectorOf(unpublished_code)); +} + +void CompilationStateImpl::PublishCode(Vector<std::unique_ptr<WasmCode>> code) { + WasmCodeRefScope code_ref_scope; + std::vector<WasmCode*> published_code = + native_module_->PublishCode(std::move(code)); + // Defer logging code in case wire bytes were not fully received yet. + if (native_module_->HasWireBytes()) { + native_module_->engine()->LogCode(VectorOf(published_code)); + } + + OnFinishedUnits(VectorOf(std::move(published_code))); +} + +void CompilationStateImpl::SchedulePublishCompilationResults( + std::vector<std::unique_ptr<WasmCode>> unpublished_code) { + { + base::MutexGuard guard(&publish_mutex_); + if (publisher_running_) { + // Add new code to the queue and return. + publish_queue_.reserve(publish_queue_.size() + unpublished_code.size()); + for (auto& c : unpublished_code) { + publish_queue_.emplace_back(std::move(c)); + } + return; + } + publisher_running_ = true; + } + while (true) { + PublishCompilationResults(std::move(unpublished_code)); + unpublished_code.clear(); + + // Keep publishing new code that came in. + base::MutexGuard guard(&publish_mutex_); + DCHECK(publisher_running_); + if (publish_queue_.empty()) { + publisher_running_ = false; + return; + } + unpublished_code.swap(publish_queue_); + } +} + +void CompilationStateImpl::ScheduleCompileJobForNewUnits() { if (failed()) return; - std::unique_ptr<JobTask> new_compile_job = - std::make_unique<BackgroundCompileJob>( - background_compile_token_, async_counters_, - scheduled_units_approximation_, max_compile_concurrency_); - // TODO(wasm): Lower priority for TurboFan-only jobs. - std::shared_ptr<JobHandle> handle = V8::GetCurrentPlatform()->PostJob( - has_priority_ ? TaskPriority::kUserBlocking : TaskPriority::kUserVisible, - std::move(new_compile_job)); - native_module_->engine()->ShepherdCompileJobHandle(handle); - current_compile_job_ = - std::make_unique<ThreadSafeJobHandle>(std::move(handle)); + std::shared_ptr<JobHandle> new_job_handle; + { + base::MutexGuard guard(&mutex_); + if (current_compile_job_ && current_compile_job_->IsValid()) { + current_compile_job_->NotifyConcurrencyIncrease(); + return; + } + + std::unique_ptr<JobTask> new_compile_job = + std::make_unique<BackgroundCompileJob>(native_module_weak_, + async_counters_); + // TODO(wasm): Lower priority for TurboFan-only jobs. + new_job_handle = V8::GetCurrentPlatform()->PostJob( + has_priority_ ? TaskPriority::kUserBlocking + : TaskPriority::kUserVisible, + std::move(new_compile_job)); + current_compile_job_ = new_job_handle; + // Reset the priority. Later uses of the compilation state, e.g. for + // debugging, should compile with the default priority again. + has_priority_ = false; + } - // Reset the priority. Later uses of the compilation state, e.g. for - // debugging, should compile with the default priority again. - has_priority_ = false; + if (new_job_handle) { + native_module_->engine()->ShepherdCompileJobHandle( + std::move(new_job_handle)); + } } size_t CompilationStateImpl::NumOutstandingCompilations() const { @@ -3307,12 +3271,14 @@ size_t CompilationStateImpl::NumOutstandingCompilations() const { } void CompilationStateImpl::SetError() { + compile_cancelled_.store(true, std::memory_order_relaxed); if (compile_failed_.exchange(true, std::memory_order_relaxed)) { return; // Already failed before. } base::MutexGuard callbacks_guard(&callbacks_mutex_); TriggerCallbacks(); + callbacks_.clear(); } void CompilationStateImpl::WaitForCompilationEvent( @@ -3330,7 +3296,7 @@ void CompilationStateImpl::WaitForCompilationEvent( } constexpr JobDelegate* kNoDelegate = nullptr; - ExecuteCompilationUnits(background_compile_token_, async_counters_.get(), + ExecuteCompilationUnits(native_module_weak_, async_counters_.get(), kNoDelegate, kBaselineOnly); compilation_event_semaphore->Wait(); } @@ -3350,7 +3316,6 @@ class CompileJSToWasmWrapperJob final : public JobTask { size_t max_concurrency) : queue_(queue), compilation_units_(compilation_units), - max_concurrency_(max_concurrency), outstanding_units_(queue->size()) {} void Run(JobDelegate* delegate) override { @@ -3366,14 +3331,15 @@ class CompileJSToWasmWrapperJob final : public JobTask { // {outstanding_units_} includes the units that other workers are currently // working on, so we can safely ignore the {worker_count} and just return // the current number of outstanding units. - return std::min(max_concurrency_, + size_t flag_limit = + static_cast<size_t>(std::max(1, FLAG_wasm_num_compilation_tasks)); + return std::min(flag_limit, outstanding_units_.load(std::memory_order_relaxed)); } private: JSToWasmWrapperQueue* const queue_; JSToWasmWrapperUnitMap* const compilation_units_; - const size_t max_concurrency_; std::atomic<size_t> outstanding_units_; }; } // namespace @@ -3395,7 +3361,8 @@ void CompileJsToWasmWrappers(Isolate* isolate, const WasmModule* module, if (queue.insert(key)) { auto unit = std::make_unique<JSToWasmWrapperCompilationUnit>( isolate, isolate->wasm_engine(), function.sig, module, - function.imported, enabled_features); + function.imported, enabled_features, + JSToWasmWrapperCompilationUnit::kAllowGeneric); compilation_units.emplace(key, std::move(unit)); } } |