summaryrefslogtreecommitdiff
path: root/deps/v8/src/wasm/module-compiler.cc
diff options
context:
space:
mode:
Diffstat (limited to 'deps/v8/src/wasm/module-compiler.cc')
-rw-r--r--deps/v8/src/wasm/module-compiler.cc901
1 files changed, 434 insertions, 467 deletions
diff --git a/deps/v8/src/wasm/module-compiler.cc b/deps/v8/src/wasm/module-compiler.cc
index 967e092b5b..82f86786a7 100644
--- a/deps/v8/src/wasm/module-compiler.cc
+++ b/deps/v8/src/wasm/module-compiler.cc
@@ -79,105 +79,24 @@ enum class CompileStrategy : uint8_t {
kDefault = kEager,
};
-// Background compile jobs hold a shared pointer to this token. The token is
-// used to notify them that they should stop. As soon as they see this (after
-// finishing their current compilation unit), they will stop.
-// This allows to already remove the NativeModule without having to synchronize
-// on background compile jobs.
-class BackgroundCompileToken {
- public:
- explicit BackgroundCompileToken(
- const std::shared_ptr<NativeModule>& native_module)
- : native_module_(native_module) {}
-
- void Cancel() {
- base::SharedMutexGuard<base::kExclusive> mutex_guard(
- &compilation_scope_mutex_);
- native_module_.reset();
- }
-
- private:
- friend class BackgroundCompileScope;
-
- std::shared_ptr<NativeModule> StartScope() {
- compilation_scope_mutex_.LockShared();
- return native_module_.lock();
- }
-
- // This private method can only be called via {BackgroundCompileScope}.
- void SchedulePublishCode(NativeModule* native_module,
- std::vector<std::unique_ptr<WasmCode>> codes) {
- {
- base::MutexGuard guard(&publish_mutex_);
- if (publisher_running_) {
- // Add new code to the queue and return.
- publish_queue_.reserve(publish_queue_.size() + codes.size());
- for (auto& c : codes) publish_queue_.emplace_back(std::move(c));
- return;
- }
- publisher_running_ = true;
- }
- while (true) {
- PublishCode(native_module, VectorOf(codes));
- codes.clear();
-
- // Keep publishing new code that came in.
- base::MutexGuard guard(&publish_mutex_);
- DCHECK(publisher_running_);
- if (publish_queue_.empty()) {
- publisher_running_ = false;
- return;
- }
- codes.swap(publish_queue_);
- }
- }
-
- void PublishCode(NativeModule*, Vector<std::unique_ptr<WasmCode>>);
-
- void ExitScope() { compilation_scope_mutex_.UnlockShared(); }
-
- // {compilation_scope_mutex_} protects {native_module_}.
- base::SharedMutex compilation_scope_mutex_;
- std::weak_ptr<NativeModule> native_module_;
-
- // {publish_mutex_} protects {publish_queue_} and {publisher_running_}.
- base::Mutex publish_mutex_;
- std::vector<std::unique_ptr<WasmCode>> publish_queue_;
- bool publisher_running_ = false;
-};
-
class CompilationStateImpl;
-// Keep these scopes short, as they hold the mutex of the token, which
-// sequentializes all these scopes. The mutex is also acquired from foreground
-// tasks, which should not be blocked for a long time.
class BackgroundCompileScope {
public:
- explicit BackgroundCompileScope(
- const std::shared_ptr<BackgroundCompileToken>& token)
- : token_(token.get()), native_module_(token->StartScope()) {}
-
- ~BackgroundCompileScope() { token_->ExitScope(); }
-
- bool cancelled() const { return native_module_ == nullptr; }
+ explicit BackgroundCompileScope(std::weak_ptr<NativeModule> native_module)
+ : native_module_(native_module.lock()) {}
- NativeModule* native_module() {
- DCHECK(!cancelled());
+ NativeModule* native_module() const {
+ DCHECK(native_module_);
return native_module_.get();
}
+ inline CompilationStateImpl* compilation_state() const;
- inline CompilationStateImpl* compilation_state();
-
- // Call {SchedulePublishCode} via the {BackgroundCompileScope} to guarantee
- // that the {NativeModule} stays alive.
- void SchedulePublishCode(std::vector<std::unique_ptr<WasmCode>> codes) {
- token_->SchedulePublishCode(native_module_.get(), std::move(codes));
- }
+ bool cancelled() const;
private:
- BackgroundCompileToken* const token_;
// Keep the native module alive while in this scope.
- std::shared_ptr<NativeModule> const native_module_;
+ std::shared_ptr<NativeModule> native_module_;
};
enum CompileBaselineOnly : bool {
@@ -190,33 +109,74 @@ enum CompileBaselineOnly : bool {
// runs empty.
class CompilationUnitQueues {
public:
- explicit CompilationUnitQueues(int max_tasks, int num_declared_functions)
- : queues_(max_tasks), top_tier_priority_units_queues_(max_tasks) {
- DCHECK_LT(0, max_tasks);
- for (int task_id = 0; task_id < max_tasks; ++task_id) {
- queues_[task_id].next_steal_task_id = next_task_id(task_id);
- }
+ // Public API for QueueImpl.
+ struct Queue {
+ bool ShouldPublish(int num_processed_units) const;
+ };
+
+ explicit CompilationUnitQueues(int num_declared_functions)
+ : num_declared_functions_(num_declared_functions) {
+ // Add one first queue, to add units to.
+ queues_.emplace_back(std::make_unique<QueueImpl>(0));
+
for (auto& atomic_counter : num_units_) {
std::atomic_init(&atomic_counter, size_t{0});
}
- treated_ = std::make_unique<std::atomic<bool>[]>(num_declared_functions);
+ top_tier_compiled_ =
+ std::make_unique<std::atomic<bool>[]>(num_declared_functions);
for (int i = 0; i < num_declared_functions; i++) {
- std::atomic_init(&treated_.get()[i], false);
+ std::atomic_init(&top_tier_compiled_.get()[i], false);
}
}
- base::Optional<WasmCompilationUnit> GetNextUnit(
- int task_id, CompileBaselineOnly baseline_only) {
- DCHECK_LE(0, task_id);
- DCHECK_GT(queues_.size(), task_id);
+ Queue* GetQueueForTask(int task_id) {
+ int required_queues = task_id + 1;
+ {
+ base::SharedMutexGuard<base::kShared> queues_guard(&queues_mutex_);
+ if (V8_LIKELY(static_cast<int>(queues_.size()) >= required_queues)) {
+ return queues_[task_id].get();
+ }
+ }
+
+ // Otherwise increase the number of queues.
+ base::SharedMutexGuard<base::kExclusive> queues_guard(&queues_mutex_);
+ int num_queues = static_cast<int>(queues_.size());
+ while (num_queues < required_queues) {
+ int steal_from = num_queues + 1;
+ queues_.emplace_back(std::make_unique<QueueImpl>(steal_from));
+ ++num_queues;
+ }
+
+ // Update the {publish_limit}s of all queues.
+
+ // We want background threads to publish regularly (to avoid contention when
+ // they are all publishing at the end). On the other side, each publishing
+ // has some overhead (part of it for synchronizing between threads), so it
+ // should not happen *too* often. Thus aim for 4-8 publishes per thread, but
+ // distribute it such that publishing is likely to happen at different
+ // times.
+ int units_per_thread = num_declared_functions_ / num_queues;
+ int min = std::max(10, units_per_thread / 8);
+ int queue_id = 0;
+ for (auto& queue : queues_) {
+ // Set a limit between {min} and {2*min}, but not smaller than {10}.
+ int limit = min + (min * queue_id / num_queues);
+ queue->publish_limit.store(limit, std::memory_order_relaxed);
+ ++queue_id;
+ }
+
+ return queues_[task_id].get();
+ }
+ base::Optional<WasmCompilationUnit> GetNextUnit(
+ Queue* queue, CompileBaselineOnly baseline_only) {
// As long as any lower-tier units are outstanding we need to steal them
// before executing own higher-tier units.
int max_tier = baseline_only ? kBaseline : kTopTier;
for (int tier = GetLowestTierWithUnits(); tier <= max_tier; ++tier) {
- if (auto unit = GetNextUnitOfTier(task_id, tier)) {
+ if (auto unit = GetNextUnitOfTier(queue, tier)) {
size_t old_units_count =
num_units_[tier].fetch_sub(1, std::memory_order_relaxed);
DCHECK_LE(1, old_units_count);
@@ -233,13 +193,18 @@ class CompilationUnitQueues {
DCHECK_LT(0, baseline_units.size() + top_tier_units.size());
// Add to the individual queues in a round-robin fashion. No special care is
// taken to balance them; they will be balanced by work stealing.
- int queue_to_add = next_queue_to_add.load(std::memory_order_relaxed);
- while (!next_queue_to_add.compare_exchange_weak(
- queue_to_add, next_task_id(queue_to_add), std::memory_order_relaxed)) {
- // Retry with updated {queue_to_add}.
+ QueueImpl* queue;
+ {
+ int queue_to_add = next_queue_to_add.load(std::memory_order_relaxed);
+ base::SharedMutexGuard<base::kShared> queues_guard(&queues_mutex_);
+ while (!next_queue_to_add.compare_exchange_weak(
+ queue_to_add, next_task_id(queue_to_add, queues_.size()),
+ std::memory_order_relaxed)) {
+ // Retry with updated {queue_to_add}.
+ }
+ queue = queues_[queue_to_add].get();
}
- Queue* queue = &queues_[queue_to_add];
base::MutexGuard guard(&queue->mutex);
base::Optional<base::MutexGuard> big_units_guard;
for (auto pair : {std::make_pair(int{kBaseline}, baseline_units),
@@ -265,22 +230,24 @@ class CompilationUnitQueues {
}
void AddTopTierPriorityUnit(WasmCompilationUnit unit, size_t priority) {
+ base::SharedMutexGuard<base::kShared> queues_guard(&queues_mutex_);
// Add to the individual queues in a round-robin fashion. No special care is
// taken to balance them; they will be balanced by work stealing. We use
// the same counter for this reason.
int queue_to_add = next_queue_to_add.load(std::memory_order_relaxed);
while (!next_queue_to_add.compare_exchange_weak(
- queue_to_add, next_task_id(queue_to_add), std::memory_order_relaxed)) {
+ queue_to_add, next_task_id(queue_to_add, queues_.size()),
+ std::memory_order_relaxed)) {
// Retry with updated {queue_to_add}.
}
- TopTierPriorityUnitsQueue* queue =
- &top_tier_priority_units_queues_[queue_to_add];
- base::MutexGuard guard(&queue->mutex);
-
+ {
+ auto* queue = queues_[queue_to_add].get();
+ base::MutexGuard guard(&queue->mutex);
+ queue->top_tier_priority_units.emplace(priority, unit);
+ }
num_priority_units_.fetch_add(1, std::memory_order_relaxed);
num_units_[kTopTier].fetch_add(1, std::memory_order_relaxed);
- queue->units.emplace(priority, unit);
}
// Get the current total number of units in all queues. This is only a
@@ -304,15 +271,6 @@ class CompilationUnitQueues {
// order of their function body size.
static constexpr size_t kBigUnitsLimit = 4096;
- struct Queue {
- base::Mutex mutex;
-
- // Protected by {mutex}:
- std::vector<WasmCompilationUnit> units[kNumTiers];
- int next_steal_task_id;
- // End of fields protected by {mutex}.
- };
-
struct BigUnit {
BigUnit(size_t func_size, WasmCompilationUnit unit)
: func_size{func_size}, unit(unit) {}
@@ -351,28 +309,27 @@ class CompilationUnitQueues {
std::priority_queue<BigUnit> units[kNumTiers];
};
- struct TopTierPriorityUnitsQueue {
+ struct QueueImpl : public Queue {
+ explicit QueueImpl(int next_steal_task_id)
+ : next_steal_task_id(next_steal_task_id) {}
+
+ // Number of units after which the task processing this queue should publish
+ // compilation results. Updated (reduced, using relaxed ordering) when new
+ // queues are allocated. If there is only one thread running, we can delay
+ // publishing arbitrarily.
+ std::atomic<int> publish_limit{kMaxInt};
+
base::Mutex mutex;
- // Protected by {mutex}:
- std::priority_queue<TopTierPriorityUnit> units;
+ // All fields below are protected by {mutex}.
+ std::vector<WasmCompilationUnit> units[kNumTiers];
+ std::priority_queue<TopTierPriorityUnit> top_tier_priority_units;
int next_steal_task_id;
- // End of fields protected by {mutex}.
};
- std::vector<Queue> queues_;
- BigUnitsQueue big_units_queue_;
-
- std::vector<TopTierPriorityUnitsQueue> top_tier_priority_units_queues_;
-
- std::atomic<size_t> num_units_[kNumTiers];
- std::atomic<size_t> num_priority_units_{0};
- std::unique_ptr<std::atomic<bool>[]> treated_;
- std::atomic<int> next_queue_to_add{0};
-
- int next_task_id(int task_id) const {
+ int next_task_id(int task_id, size_t num_queues) const {
int next = task_id + 1;
- return next == static_cast<int>(queues_.size()) ? 0 : next;
+ return next == static_cast<int>(num_queues) ? 0 : next;
}
int GetLowestTierWithUnits() const {
@@ -382,13 +339,13 @@ class CompilationUnitQueues {
return kNumTiers;
}
- base::Optional<WasmCompilationUnit> GetNextUnitOfTier(int task_id, int tier) {
- Queue* queue = &queues_[task_id];
+ base::Optional<WasmCompilationUnit> GetNextUnitOfTier(Queue* public_queue,
+ int tier) {
+ QueueImpl* queue = static_cast<QueueImpl*>(public_queue);
- // First check whether there is a priority unit. Execute that
- // first.
+ // First check whether there is a priority unit. Execute that first.
if (tier == kTopTier) {
- if (auto unit = GetTopTierPriorityUnit(task_id)) {
+ if (auto unit = GetTopTierPriorityUnit(queue)) {
return unit;
}
}
@@ -411,12 +368,16 @@ class CompilationUnitQueues {
// Try to steal from all other queues. If this succeeds, return one of the
// stolen units.
- size_t steal_trials = queues_.size();
- for (; steal_trials > 0;
- --steal_trials, steal_task_id = next_task_id(steal_task_id)) {
- if (steal_task_id == task_id) continue;
- if (auto unit = StealUnitsAndGetFirst(task_id, steal_task_id, tier)) {
- return unit;
+ {
+ base::SharedMutexGuard<base::kShared> guard(&queues_mutex_);
+ for (size_t steal_trials = 0; steal_trials < queues_.size();
+ ++steal_trials, ++steal_task_id) {
+ if (steal_task_id >= static_cast<int>(queues_.size())) {
+ steal_task_id = 0;
+ }
+ if (auto unit = StealUnitsAndGetFirst(queue, steal_task_id, tier)) {
+ return unit;
+ }
}
}
@@ -425,7 +386,7 @@ class CompilationUnitQueues {
}
base::Optional<WasmCompilationUnit> GetBigUnitOfTier(int tier) {
- // Fast-path without locking.
+ // Fast path without locking.
if (!big_units_queue_.has_units[tier].load(std::memory_order_relaxed)) {
return {};
}
@@ -439,25 +400,22 @@ class CompilationUnitQueues {
return unit;
}
- base::Optional<WasmCompilationUnit> GetTopTierPriorityUnit(int task_id) {
- // Fast-path without locking.
+ base::Optional<WasmCompilationUnit> GetTopTierPriorityUnit(QueueImpl* queue) {
+ // Fast path without locking.
if (num_priority_units_.load(std::memory_order_relaxed) == 0) {
return {};
}
- TopTierPriorityUnitsQueue* queue =
- &top_tier_priority_units_queues_[task_id];
-
int steal_task_id;
{
base::MutexGuard mutex_guard(&queue->mutex);
- while (!queue->units.empty()) {
- auto unit = queue->units.top().unit;
- queue->units.pop();
+ while (!queue->top_tier_priority_units.empty()) {
+ auto unit = queue->top_tier_priority_units.top().unit;
+ queue->top_tier_priority_units.pop();
num_priority_units_.fetch_sub(1, std::memory_order_relaxed);
- if (!treated_[unit.func_index()].exchange(true,
- std::memory_order_relaxed)) {
+ if (!top_tier_compiled_[unit.func_index()].exchange(
+ true, std::memory_order_relaxed)) {
return unit;
}
num_units_[kTopTier].fetch_sub(1, std::memory_order_relaxed);
@@ -467,28 +425,34 @@ class CompilationUnitQueues {
// Try to steal from all other queues. If this succeeds, return one of the
// stolen units.
- size_t steal_trials = queues_.size();
- for (; steal_trials > 0;
- --steal_trials, steal_task_id = next_task_id(steal_task_id)) {
- if (steal_task_id == task_id) continue;
- if (auto unit = StealTopTierPriorityUnit(task_id, steal_task_id)) {
- return unit;
+ {
+ base::SharedMutexGuard<base::kShared> guard(&queues_mutex_);
+ for (size_t steal_trials = 0; steal_trials < queues_.size();
+ ++steal_trials, ++steal_task_id) {
+ if (steal_task_id >= static_cast<int>(queues_.size())) {
+ steal_task_id = 0;
+ }
+ if (auto unit = StealTopTierPriorityUnit(queue, steal_task_id)) {
+ return unit;
+ }
}
}
return {};
}
- // Steal units of {wanted_tier} from {steal_from_task_id} to {task_id}. Return
+ // Steal units of {wanted_tier} from {steal_from_task_id} to {queue}. Return
// first stolen unit (rest put in queue of {task_id}), or {nullopt} if
// {steal_from_task_id} had no units of {wanted_tier}.
+ // Hold a shared lock on {queues_mutex_} when calling this method.
base::Optional<WasmCompilationUnit> StealUnitsAndGetFirst(
- int task_id, int steal_from_task_id, int wanted_tier) {
- DCHECK_NE(task_id, steal_from_task_id);
+ QueueImpl* queue, int steal_from_task_id, int wanted_tier) {
+ auto* steal_queue = queues_[steal_from_task_id].get();
+ // Cannot steal from own queue.
+ if (steal_queue == queue) return {};
std::vector<WasmCompilationUnit> stolen;
base::Optional<WasmCompilationUnit> returned_unit;
{
- Queue* steal_queue = &queues_[steal_from_task_id];
base::MutexGuard guard(&steal_queue->mutex);
auto* steal_from_vector = &steal_queue->units[wanted_tier];
if (steal_from_vector->empty()) return {};
@@ -498,81 +462,65 @@ class CompilationUnitQueues {
stolen.assign(steal_begin + 1, steal_from_vector->end());
steal_from_vector->erase(steal_begin, steal_from_vector->end());
}
- Queue* queue = &queues_[task_id];
base::MutexGuard guard(&queue->mutex);
auto* target_queue = &queue->units[wanted_tier];
target_queue->insert(target_queue->end(), stolen.begin(), stolen.end());
- queue->next_steal_task_id = next_task_id(steal_from_task_id);
+ queue->next_steal_task_id = steal_from_task_id + 1;
return returned_unit;
}
// Steal one priority unit from {steal_from_task_id} to {task_id}. Return
// stolen unit, or {nullopt} if {steal_from_task_id} had no priority units.
+ // Hold a shared lock on {queues_mutex_} when calling this method.
base::Optional<WasmCompilationUnit> StealTopTierPriorityUnit(
- int task_id, int steal_from_task_id) {
- DCHECK_NE(task_id, steal_from_task_id);
-
+ QueueImpl* queue, int steal_from_task_id) {
+ auto* steal_queue = queues_[steal_from_task_id].get();
+ // Cannot steal from own queue.
+ if (steal_queue == queue) return {};
base::Optional<WasmCompilationUnit> returned_unit;
{
- TopTierPriorityUnitsQueue* steal_queue =
- &top_tier_priority_units_queues_[steal_from_task_id];
base::MutexGuard guard(&steal_queue->mutex);
while (true) {
- if (steal_queue->units.empty()) return {};
+ if (steal_queue->top_tier_priority_units.empty()) return {};
- auto unit = steal_queue->units.top().unit;
- steal_queue->units.pop();
+ auto unit = steal_queue->top_tier_priority_units.top().unit;
+ steal_queue->top_tier_priority_units.pop();
num_priority_units_.fetch_sub(1, std::memory_order_relaxed);
- if (!treated_[unit.func_index()].exchange(true,
- std::memory_order_relaxed)) {
+ if (!top_tier_compiled_[unit.func_index()].exchange(
+ true, std::memory_order_relaxed)) {
returned_unit = unit;
break;
}
num_units_[kTopTier].fetch_sub(1, std::memory_order_relaxed);
}
}
- TopTierPriorityUnitsQueue* queue =
- &top_tier_priority_units_queues_[task_id];
base::MutexGuard guard(&queue->mutex);
- queue->next_steal_task_id = next_task_id(steal_from_task_id);
+ queue->next_steal_task_id = steal_from_task_id + 1;
return returned_unit;
}
-};
-
-// {JobHandle} is not thread safe in general (at least both the
-// {DefaultJobHandle} and chromium's {base::JobHandle} are not). Hence, protect
-// concurrent accesses via a mutex.
-class ThreadSafeJobHandle {
- public:
- explicit ThreadSafeJobHandle(std::shared_ptr<JobHandle> job_handle)
- : job_handle_(std::move(job_handle)) {}
- void NotifyConcurrencyIncrease() {
- base::MutexGuard guard(&mutex_);
- job_handle_->NotifyConcurrencyIncrease();
- }
+ // {queues_mutex_} protectes {queues_};
+ base::SharedMutex queues_mutex_;
+ std::vector<std::unique_ptr<QueueImpl>> queues_;
- void Join() {
- base::MutexGuard guard(&mutex_);
- job_handle_->Join();
- }
+ const int num_declared_functions_;
- void Cancel() {
- base::MutexGuard guard(&mutex_);
- job_handle_->Cancel();
- }
-
- bool IsRunning() const {
- base::MutexGuard guard(&mutex_);
- return job_handle_->IsRunning();
- }
+ BigUnitsQueue big_units_queue_;
- private:
- mutable base::Mutex mutex_;
- std::shared_ptr<JobHandle> job_handle_;
+ std::atomic<size_t> num_units_[kNumTiers];
+ std::atomic<size_t> num_priority_units_{0};
+ std::unique_ptr<std::atomic<bool>[]> top_tier_compiled_;
+ std::atomic<int> next_queue_to_add{0};
};
+bool CompilationUnitQueues::Queue::ShouldPublish(
+ int num_processed_units) const {
+ auto* queue = static_cast<const QueueImpl*>(this);
+ return num_processed_units >=
+ queue->publish_limit.load(std::memory_order_relaxed);
+}
+
// The {CompilationStateImpl} keeps track of the compilation state of the
// owning NativeModule, i.e. which functions are left to be compiled.
// It contains a task manager to allow parallel and asynchronous background
@@ -586,6 +534,7 @@ class CompilationStateImpl {
// Cancel all background compilation, without waiting for compile tasks to
// finish.
void CancelCompilation();
+ bool cancelled() const;
// Initialize compilation progress. Set compilation tiers to expect for
// baseline and top tier compilation. Must be set before {AddCompilationUnits}
@@ -618,8 +567,11 @@ class CompilationStateImpl {
js_to_wasm_wrapper_units);
void AddTopTierCompilationUnit(WasmCompilationUnit);
void AddTopTierPriorityCompilationUnit(WasmCompilationUnit, size_t);
+
+ CompilationUnitQueues::Queue* GetQueueForCompileTask(int task_id);
+
base::Optional<WasmCompilationUnit> GetNextCompilationUnit(
- int task_id, CompileBaselineOnly baseline_only);
+ CompilationUnitQueues::Queue*, CompileBaselineOnly);
std::shared_ptr<JSToWasmWrapperCompilationUnit>
GetNextJSToWasmWrapperCompilationUnit();
@@ -629,13 +581,13 @@ class CompilationStateImpl {
void OnFinishedUnits(Vector<WasmCode*>);
void OnFinishedJSToWasmWrapperUnits(int num);
- int GetFreeCompileTaskId();
- int GetUnpublishedUnitsLimits(int task_id);
- void OnCompilationStopped(int task_id, const WasmFeatures& detected);
+ void OnCompilationStopped(const WasmFeatures& detected);
void PublishDetectedFeatures(Isolate*);
+ void SchedulePublishCompilationResults(
+ std::vector<std::unique_ptr<WasmCode>> unpublished_code);
// Ensure that a compilation job is running, and increase its concurrency if
// needed.
- void ScheduleCompileJobForNewUnits(int new_units);
+ void ScheduleCompileJobForNewUnits();
size_t NumOutstandingCompilations() const;
@@ -687,8 +639,12 @@ class CompilationStateImpl {
// Hold the {callbacks_mutex_} when calling this method.
void TriggerCallbacks(base::EnumSet<CompilationEvent> additional_events = {});
+ void PublishCompilationResults(
+ std::vector<std::unique_ptr<WasmCode>> unpublished_code);
+ void PublishCode(Vector<std::unique_ptr<WasmCode>> codes);
+
NativeModule* const native_module_;
- const std::shared_ptr<BackgroundCompileToken> background_compile_token_;
+ std::weak_ptr<NativeModule> const native_module_weak_;
const CompileMode compile_mode_;
const std::shared_ptr<Counters> async_counters_;
@@ -696,20 +652,9 @@ class CompilationStateImpl {
// using relaxed semantics.
std::atomic<bool> compile_failed_{false};
- // The atomic counter is shared with the compilation job. It's increased if
- // more units are added, and decreased when the queue drops to zero. Hence
- // it's an approximation of the current number of available units in the
- // queue, but it's not updated after popping a single unit, because that
- // would create too much contention.
- // This counter is not used for synchronization, hence relaxed memory ordering
- // can be used. The thread that increases the counter is the same that calls
- // {NotifyConcurrencyIncrease} later. The only reduction of the counter is a
- // drop to zero after a worker does not find any unit in the queue, and after
- // that drop another check is executed to ensure that any left-over units are
- // still processed.
- std::shared_ptr<std::atomic<int>> scheduled_units_approximation_ =
- std::make_shared<std::atomic<int>>(0);
- const int max_compile_concurrency_ = 0;
+ // True if compilation was cancelled and worker threads should return. This
+ // flag can be updated and read using relaxed semantics.
+ std::atomic<bool> compile_cancelled_{false};
CompilationUnitQueues compilation_unit_queues_;
@@ -729,7 +674,7 @@ class CompilationStateImpl {
//////////////////////////////////////////////////////////////////////////////
// Protected by {mutex_}:
- std::shared_ptr<ThreadSafeJobHandle> current_compile_job_;
+ std::shared_ptr<JobHandle> current_compile_job_;
// Features detected to be used in this module. Features can be detected
// as a module is being compiled.
@@ -768,6 +713,11 @@ class CompilationStateImpl {
// End of fields protected by {callbacks_mutex_}.
//////////////////////////////////////////////////////////////////////////////
+ // {publish_mutex_} protects {publish_queue_} and {publisher_running_}.
+ base::Mutex publish_mutex_;
+ std::vector<std::unique_ptr<WasmCode>> publish_queue_;
+ bool publisher_running_ = false;
+
// Encoding of fields in the {compilation_progress_} vector.
using RequiredBaselineTierField = base::BitField8<ExecutionTier, 0, 2>;
using RequiredTopTierField = base::BitField8<ExecutionTier, 2, 2>;
@@ -782,21 +732,14 @@ const CompilationStateImpl* Impl(const CompilationState* compilation_state) {
return reinterpret_cast<const CompilationStateImpl*>(compilation_state);
}
-CompilationStateImpl* BackgroundCompileScope::compilation_state() {
- return Impl(native_module()->compilation_state());
+CompilationStateImpl* BackgroundCompileScope::compilation_state() const {
+ DCHECK(native_module_);
+ return Impl(native_module_->compilation_state());
}
-void BackgroundCompileToken::PublishCode(
- NativeModule* native_module, Vector<std::unique_ptr<WasmCode>> code) {
- WasmCodeRefScope code_ref_scope;
- std::vector<WasmCode*> published_code = native_module->PublishCode(code);
- // Defer logging code in case wire bytes were not fully received yet.
- if (native_module->HasWireBytes()) {
- native_module->engine()->LogCode(VectorOf(published_code));
- }
-
- Impl(native_module->compilation_state())
- ->OnFinishedUnits(VectorOf(published_code));
+bool BackgroundCompileScope::cancelled() const {
+ return native_module_ == nullptr ||
+ Impl(native_module_->compilation_state())->cancelled();
}
void UpdateFeatureUseCounts(Isolate* isolate, const WasmFeatures& detected) {
@@ -877,8 +820,9 @@ bool CompilationState::recompilation_finished() const {
std::unique_ptr<CompilationState> CompilationState::New(
const std::shared_ptr<NativeModule>& native_module,
std::shared_ptr<Counters> async_counters) {
- return std::unique_ptr<CompilationState>(reinterpret_cast<CompilationState*>(
- new CompilationStateImpl(native_module, std::move(async_counters))));
+ return std::unique_ptr<CompilationState>(
+ reinterpret_cast<CompilationState*>(new CompilationStateImpl(
+ std::move(native_module), std::move(async_counters))));
}
// End of PIMPL implementation of {CompilationState}.
@@ -1215,31 +1159,31 @@ void TriggerTierUp(Isolate* isolate, NativeModule* native_module,
namespace {
void RecordStats(const Code code, Counters* counters) {
- counters->wasm_generated_code_size()->Increment(code.body_size());
+ counters->wasm_generated_code_size()->Increment(code.raw_body_size());
counters->wasm_reloc_size()->Increment(code.relocation_info().length());
}
enum CompilationExecutionResult : int8_t { kNoMoreUnits, kYield };
CompilationExecutionResult ExecuteJSToWasmWrapperCompilationUnits(
- const std::shared_ptr<BackgroundCompileToken>& token,
- JobDelegate* delegate) {
+ std::weak_ptr<NativeModule> native_module, JobDelegate* delegate) {
std::shared_ptr<JSToWasmWrapperCompilationUnit> wrapper_unit = nullptr;
int num_processed_wrappers = 0;
{
- BackgroundCompileScope compile_scope(token);
+ BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
wrapper_unit = compile_scope.compilation_state()
->GetNextJSToWasmWrapperCompilationUnit();
if (!wrapper_unit) return kNoMoreUnits;
}
+ TRACE_EVENT0("v8.wasm", "wasm.JSToWasmWrapperCompilation");
while (true) {
wrapper_unit->Execute();
++num_processed_wrappers;
bool yield = delegate && delegate->ShouldYield();
- BackgroundCompileScope compile_scope(token);
+ BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
if (yield ||
!(wrapper_unit = compile_scope.compilation_state()
@@ -1251,16 +1195,35 @@ CompilationExecutionResult ExecuteJSToWasmWrapperCompilationUnits(
}
}
+namespace {
+const char* GetCompilationEventName(const WasmCompilationUnit& unit,
+ const CompilationEnv& env) {
+ ExecutionTier tier = unit.tier();
+ if (tier == ExecutionTier::kLiftoff) {
+ return "wasm.BaselineCompilation";
+ }
+ if (tier == ExecutionTier::kTurbofan) {
+ return "wasm.TopTierCompilation";
+ }
+ if (unit.func_index() <
+ static_cast<int>(env.module->num_imported_functions)) {
+ return "wasm.WasmToJSWrapperCompilation";
+ }
+ return "wasm.OtherCompilation";
+}
+} // namespace
+
// Run by the {BackgroundCompileJob} (on any thread).
CompilationExecutionResult ExecuteCompilationUnits(
- const std::shared_ptr<BackgroundCompileToken>& token, Counters* counters,
+ std::weak_ptr<NativeModule> native_module, Counters* counters,
JobDelegate* delegate, CompileBaselineOnly baseline_only) {
TRACE_EVENT0("v8.wasm", "wasm.ExecuteCompilationUnits");
// Execute JS to Wasm wrapper units first, so that they are ready to be
// finalized by the main thread when the kFinishedBaselineCompilation event is
// triggered.
- if (ExecuteJSToWasmWrapperCompilationUnits(token, delegate) == kYield) {
+ if (ExecuteJSToWasmWrapperCompilationUnits(native_module, delegate) ==
+ kYield) {
return kYield;
}
@@ -1270,108 +1233,65 @@ CompilationExecutionResult ExecuteCompilationUnits(
std::shared_ptr<WireBytesStorage> wire_bytes;
std::shared_ptr<const WasmModule> module;
WasmEngine* wasm_engine;
- // The Jobs API guarantees that {GetTaskId} is less than the number of
- // workers, and that the number of workers is less than or equal to the max
- // compile concurrency, which makes the task_id safe to use as an index into
- // the worker queues.
- int task_id = delegate ? delegate->GetTaskId() : 0;
- int unpublished_units_limit;
+ // Task 0 is any main thread (there might be multiple from multiple isolates),
+ // worker threads start at 1 (thus the "+ 1").
+ int task_id = delegate ? (int{delegate->GetTaskId()} + 1) : 0;
+ DCHECK_LE(0, task_id);
+ CompilationUnitQueues::Queue* queue;
base::Optional<WasmCompilationUnit> unit;
WasmFeatures detected_features = WasmFeatures::None();
- auto stop = [&detected_features,
- task_id](BackgroundCompileScope& compile_scope) {
- compile_scope.compilation_state()->OnCompilationStopped(task_id,
- detected_features);
- };
-
// Preparation (synchronized): Initialize the fields above and get the first
// compilation unit.
{
- BackgroundCompileScope compile_scope(token);
+ BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
auto* compilation_state = compile_scope.compilation_state();
env.emplace(compile_scope.native_module()->CreateCompilationEnv());
wire_bytes = compilation_state->GetWireBytesStorage();
module = compile_scope.native_module()->shared_module();
wasm_engine = compile_scope.native_module()->engine();
- unpublished_units_limit =
- compilation_state->GetUnpublishedUnitsLimits(task_id);
- unit = compilation_state->GetNextCompilationUnit(task_id, baseline_only);
- if (!unit) {
- stop(compile_scope);
- return kNoMoreUnits;
- }
+ queue = compilation_state->GetQueueForCompileTask(task_id);
+ unit = compilation_state->GetNextCompilationUnit(queue, baseline_only);
+ if (!unit) return kNoMoreUnits;
}
TRACE_COMPILE("ExecuteCompilationUnits (task id %d)\n", task_id);
std::vector<WasmCompilationResult> results_to_publish;
-
- auto publish_results = [&results_to_publish](
- BackgroundCompileScope* compile_scope) {
- TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("v8.wasm.detailed"),
- "wasm.PublishCompilationResults", "num_results",
- results_to_publish.size());
- if (results_to_publish.empty()) return;
- std::vector<std::unique_ptr<WasmCode>> unpublished_code =
- compile_scope->native_module()->AddCompiledCode(
- VectorOf(results_to_publish));
- results_to_publish.clear();
-
- // For import wrapper compilation units, add result to the cache.
- const NativeModule* native_module = compile_scope->native_module();
- int num_imported_functions = native_module->num_imported_functions();
- WasmImportWrapperCache* cache = native_module->import_wrapper_cache();
- for (const auto& code : unpublished_code) {
- int func_index = code->index();
- DCHECK_LE(0, func_index);
- DCHECK_LT(func_index, native_module->num_functions());
- if (func_index < num_imported_functions) {
- const FunctionSig* sig =
- native_module->module()->functions[func_index].sig;
- WasmImportWrapperCache::CacheKey key(
- compiler::kDefaultImportCallKind, sig,
- static_cast<int>(sig->parameter_count()));
- // If two imported functions have the same key, only one of them should
- // have been added as a compilation unit. So it is always the first time
- // we compile a wrapper for this key here.
- DCHECK_NULL((*cache)[key]);
- (*cache)[key] = code.get();
- code->IncRef();
- }
- }
-
- compile_scope->SchedulePublishCode(std::move(unpublished_code));
- };
-
- bool compilation_failed = false;
while (true) {
- // (asynchronous): Execute the compilation.
- WasmCompilationResult result = unit->ExecuteCompilation(
- wasm_engine, &env.value(), wire_bytes, counters, &detected_features);
- results_to_publish.emplace_back(std::move(result));
-
- bool yield = delegate && delegate->ShouldYield();
-
- // (synchronized): Publish the compilation result and get the next unit.
- {
- BackgroundCompileScope compile_scope(token);
+ ExecutionTier current_tier = unit->tier();
+ const char* event_name = GetCompilationEventName(unit.value(), env.value());
+ TRACE_EVENT0("v8.wasm", event_name);
+ while (unit->tier() == current_tier) {
+ // (asynchronous): Execute the compilation.
+ WasmCompilationResult result = unit->ExecuteCompilation(
+ wasm_engine, &env.value(), wire_bytes, counters, &detected_features);
+ results_to_publish.emplace_back(std::move(result));
+
+ bool yield = delegate && delegate->ShouldYield();
+
+ // (synchronized): Publish the compilation result and get the next unit.
+ BackgroundCompileScope compile_scope(native_module);
if (compile_scope.cancelled()) return kNoMoreUnits;
+
if (!results_to_publish.back().succeeded()) {
- // Compile error.
compile_scope.compilation_state()->SetError();
- stop(compile_scope);
- compilation_failed = true;
- break;
+ return kNoMoreUnits;
}
- // Get next unit.
+ // Yield or get next unit.
if (yield ||
!(unit = compile_scope.compilation_state()->GetNextCompilationUnit(
- task_id, baseline_only))) {
- publish_results(&compile_scope);
- stop(compile_scope);
+ queue, baseline_only))) {
+ std::vector<std::unique_ptr<WasmCode>> unpublished_code =
+ compile_scope.native_module()->AddCompiledCode(
+ VectorOf(std::move(results_to_publish)));
+ results_to_publish.clear();
+ compile_scope.compilation_state()->SchedulePublishCompilationResults(
+ std::move(unpublished_code));
+ compile_scope.compilation_state()->OnCompilationStopped(
+ detected_features);
return yield ? kYield : kNoMoreUnits;
}
@@ -1382,17 +1302,17 @@ CompilationExecutionResult ExecuteCompilationUnits(
// Also publish after finishing a certain amount of units, to avoid
// contention when all threads publish at the end.
if (unit->tier() == ExecutionTier::kTurbofan ||
- static_cast<int>(results_to_publish.size()) >=
- unpublished_units_limit) {
- publish_results(&compile_scope);
+ queue->ShouldPublish(static_cast<int>(results_to_publish.size()))) {
+ std::vector<std::unique_ptr<WasmCode>> unpublished_code =
+ compile_scope.native_module()->AddCompiledCode(
+ VectorOf(std::move(results_to_publish)));
+ results_to_publish.clear();
+ compile_scope.compilation_state()->SchedulePublishCompilationResults(
+ std::move(unpublished_code));
}
}
}
- // We only get here if compilation failed. Other exits return directly.
- DCHECK(compilation_failed);
- USE(compilation_failed);
- token->Cancel();
- return kNoMoreUnits;
+ UNREACHABLE();
}
using JSToWasmWrapperKey = std::pair<bool, FunctionSig>;
@@ -1410,7 +1330,8 @@ int AddExportWrapperUnits(Isolate* isolate, WasmEngine* wasm_engine,
if (keys.insert(key).second) {
auto unit = std::make_shared<JSToWasmWrapperCompilationUnit>(
isolate, wasm_engine, function.sig, native_module->module(),
- function.imported, enabled_features);
+ function.imported, enabled_features,
+ JSToWasmWrapperCompilationUnit::kAllowGeneric);
builder->AddJSToWasmWrapperUnit(std::move(unit));
}
}
@@ -1529,6 +1450,7 @@ class CompilationTimeCallback {
histogram->AddSample(static_cast<int>(duration.InMicroseconds()));
}
+ // TODO(sartang@microsoft.com): Remove wall_clock_time_in_us field
v8::metrics::WasmModuleCompiled event{
(compile_mode_ != kSynchronous), // async
(compile_mode_ == kStreaming), // streamed
@@ -1538,7 +1460,8 @@ class CompilationTimeCallback {
true, // success
native_module->liftoff_code_size(), // code_size_in_bytes
native_module->liftoff_bailout_count(), // liftoff_bailout_count
- duration.InMicroseconds() // wall_clock_time_in_us
+ duration.InMicroseconds(), // wall_clock_time_in_us
+ duration.InMicroseconds() // wall_clock_duration_in_us
};
metrics_recorder_->DelayMainThreadEvent(event, context_id_);
}
@@ -1549,7 +1472,8 @@ class CompilationTimeCallback {
v8::metrics::WasmModuleTieredUp event{
FLAG_wasm_lazy_compilation, // lazy
native_module->turbofan_code_size(), // code_size_in_bytes
- duration.InMicroseconds() // wall_clock_time_in_us
+ duration.InMicroseconds(), // wall_clock_time_in_us
+ duration.InMicroseconds() // wall_clock_duration_in_us
};
metrics_recorder_->DelayMainThreadEvent(event, context_id_);
}
@@ -1563,7 +1487,8 @@ class CompilationTimeCallback {
false, // success
native_module->liftoff_code_size(), // code_size_in_bytes
native_module->liftoff_bailout_count(), // liftoff_bailout_count
- duration.InMicroseconds() // wall_clock_time_in_us
+ duration.InMicroseconds(), // wall_clock_time_in_us
+ duration.InMicroseconds() // wall_clock_duration_in_us
};
metrics_recorder_->DelayMainThreadEvent(event, context_id_);
}
@@ -1646,55 +1571,33 @@ void CompileNativeModule(Isolate* isolate,
}
}
-// The runnable task that performs compilations in the background.
-class BackgroundCompileJob : public JobTask {
+class BackgroundCompileJob final : public JobTask {
public:
- explicit BackgroundCompileJob(
- std::shared_ptr<BackgroundCompileToken> token,
- std::shared_ptr<Counters> async_counters,
- std::shared_ptr<std::atomic<int>> scheduled_units_approximation,
- size_t max_concurrency)
- : token_(std::move(token)),
- async_counters_(std::move(async_counters)),
- scheduled_units_approximation_(
- std::move(scheduled_units_approximation)),
- max_concurrency_(max_concurrency) {}
+ explicit BackgroundCompileJob(std::weak_ptr<NativeModule> native_module,
+ std::shared_ptr<Counters> async_counters)
+ : native_module_(std::move(native_module)),
+ async_counters_(std::move(async_counters)) {}
void Run(JobDelegate* delegate) override {
- if (ExecuteCompilationUnits(token_, async_counters_.get(), delegate,
- kBaselineOrTopTier) == kYield) {
- return;
- }
- // Otherwise we didn't find any more units to execute. Reduce the atomic
- // counter of the approximated number of available units to zero, but then
- // check whether any more units were added in the meantime, and increase
- // back if necessary.
- scheduled_units_approximation_->store(0, std::memory_order_relaxed);
-
- BackgroundCompileScope scope(token_);
- if (scope.cancelled()) return;
- size_t outstanding_units =
- scope.compilation_state()->NumOutstandingCompilations();
- if (outstanding_units == 0) return;
- // On a race between this thread and the thread which scheduled the units,
- // this might increase concurrency more than needed, which is fine. It
- // will be reduced again when the first task finds no more work to do.
- scope.compilation_state()->ScheduleCompileJobForNewUnits(
- static_cast<int>(outstanding_units));
+ ExecuteCompilationUnits(native_module_, async_counters_.get(), delegate,
+ kBaselineOrTopTier);
}
size_t GetMaxConcurrency(size_t worker_count) const override {
- // {current_concurrency_} does not reflect the units that running workers
- // are processing, thus add the current worker count to that number.
- return std::min(max_concurrency_,
- worker_count + scheduled_units_approximation_->load());
+ BackgroundCompileScope scope(native_module_);
+ if (scope.cancelled()) return 0;
+ // NumOutstandingCompilations() does not reflect the units that running
+ // workers are processing, thus add the current worker count to that number.
+ size_t flag_limit =
+ static_cast<size_t>(std::max(1, FLAG_wasm_num_compilation_tasks));
+ return std::min(
+ flag_limit,
+ worker_count + scope.compilation_state()->NumOutstandingCompilations());
}
private:
- const std::shared_ptr<BackgroundCompileToken> token_;
+ const std::weak_ptr<NativeModule> native_module_;
const std::shared_ptr<Counters> async_counters_;
- const std::shared_ptr<std::atomic<int>> scheduled_units_approximation_;
- const size_t max_concurrency_;
};
} // namespace
@@ -1974,7 +1877,8 @@ void AsyncCompileJob::FinishCompile(bool is_after_cache_hit) {
!compilation_state->failed(), // success
native_module_->liftoff_code_size(), // code_size_in_bytes
native_module_->liftoff_bailout_count(), // liftoff_bailout_count
- duration.InMicroseconds() // wall_clock_time_in_us
+ duration.InMicroseconds(), // wall_clock_time_in_us
+ duration.InMicroseconds() // wall_clock_duration_in_us
};
isolate_->metrics_recorder()->DelayMainThreadEvent(event, context_id_);
}
@@ -2489,6 +2393,7 @@ void AsyncStreamingProcessor::FinishAsyncCompileJobWithError(
job_->metrics_event_.module_size_in_bytes = job_->wire_bytes_.length();
job_->metrics_event_.function_count = num_functions_;
job_->metrics_event_.wall_clock_time_in_us = duration.InMicroseconds();
+ job_->metrics_event_.wall_clock_duration_in_us = duration.InMicroseconds();
job_->isolate_->metrics_recorder()->DelayMainThreadEvent(job_->metrics_event_,
job_->context_id_);
@@ -2580,6 +2485,8 @@ bool AsyncStreamingProcessor::ProcessCodeSectionHeader(
return false;
}
+ decoder_.set_code_section(offset, static_cast<uint32_t>(code_section_length));
+
prefix_hash_ = base::hash_combine(prefix_hash_,
static_cast<uint32_t>(code_section_length));
if (!wasm_engine_->GetStreamingCompilationOwnership(prefix_hash_)) {
@@ -2601,7 +2508,6 @@ bool AsyncStreamingProcessor::ProcessCodeSectionHeader(
job_->DoImmediately<AsyncCompileJob::PrepareAndStartCompile>(
decoder_.shared_module(), false, code_size_estimate);
- decoder_.set_code_section(offset, static_cast<uint32_t>(code_section_length));
auto* compilation_state = Impl(job_->native_module_->compilation_state());
compilation_state->SetWireBytesStorage(std::move(wire_bytes_storage));
DCHECK_EQ(job_->native_module_->module()->origin, kWasmOrigin);
@@ -2710,6 +2616,7 @@ void AsyncStreamingProcessor::OnFinishedStream(OwnedVector<uint8_t> bytes) {
job_->metrics_event_.module_size_in_bytes = job_->wire_bytes_.length();
job_->metrics_event_.function_count = num_functions_;
job_->metrics_event_.wall_clock_time_in_us = duration.InMicroseconds();
+ job_->metrics_event_.wall_clock_duration_in_us = duration.InMicroseconds();
job_->isolate_->metrics_recorder()->DelayMainThreadEvent(job_->metrics_event_,
job_->context_id_);
@@ -2804,37 +2711,31 @@ bool AsyncStreamingProcessor::Deserialize(Vector<const uint8_t> module_bytes,
return true;
}
-// TODO(wasm): Try to avoid the {NumberOfWorkerThreads} calls, grow queues
-// dynamically instead.
-int GetMaxCompileConcurrency() {
- int num_worker_threads = V8::GetCurrentPlatform()->NumberOfWorkerThreads();
- return std::min(FLAG_wasm_num_compilation_tasks, num_worker_threads);
-}
-
CompilationStateImpl::CompilationStateImpl(
const std::shared_ptr<NativeModule>& native_module,
std::shared_ptr<Counters> async_counters)
: native_module_(native_module.get()),
- background_compile_token_(
- std::make_shared<BackgroundCompileToken>(native_module)),
+ native_module_weak_(std::move(native_module)),
compile_mode_(FLAG_wasm_tier_up &&
native_module->module()->origin == kWasmOrigin
? CompileMode::kTiering
: CompileMode::kRegular),
async_counters_(std::move(async_counters)),
- max_compile_concurrency_(std::max(GetMaxCompileConcurrency(), 1)),
- // Add one to the allowed number of parallel tasks, because the foreground
- // task sometimes also contributes.
- compilation_unit_queues_(max_compile_concurrency_ + 1,
- native_module->num_functions()) {}
+ compilation_unit_queues_(native_module->num_functions()) {}
void CompilationStateImpl::CancelCompilation() {
- background_compile_token_->Cancel();
// No more callbacks after abort.
base::MutexGuard callbacks_guard(&callbacks_mutex_);
+ // std::memory_order_relaxed is sufficient because no other state is
+ // synchronized with |compile_cancelled_|.
+ compile_cancelled_.store(true, std::memory_order_relaxed);
callbacks_.clear();
}
+bool CompilationStateImpl::cancelled() const {
+ return compile_cancelled_.load(std::memory_order_relaxed);
+}
+
void CompilationStateImpl::InitializeCompilationProgress(
bool lazy_module, int num_import_wrappers, int num_export_wrappers) {
DCHECK(!failed());
@@ -2909,6 +2810,9 @@ void CompilationStateImpl::InitializeCompilationProgressAfterDeserialization() {
RequiredBaselineTierField::encode(ExecutionTier::kTurbofan) |
RequiredTopTierField::encode(ExecutionTier::kTurbofan) |
ReachedTierField::encode(ExecutionTier::kTurbofan);
+ finished_events_.Add(CompilationEvent::kFinishedExportWrappers);
+ finished_events_.Add(CompilationEvent::kFinishedBaselineCompilation);
+ finished_events_.Add(CompilationEvent::kFinishedTopTierCompilation);
compilation_progress_.assign(module->num_declared_functions,
kProgressAfterDeserialization);
}
@@ -2956,7 +2860,9 @@ void CompilationStateImpl::InitializeRecompilation(
// start yet, and new code will be kept tiered-down from the start. For
// streaming compilation, there is a special path to tier down later, when
// the module is complete. In any case, we don't need to recompile here.
+ base::Optional<CompilationUnitBuilder> builder;
if (compilation_progress_.size() > 0) {
+ builder.emplace(native_module_);
const WasmModule* module = native_module_->module();
DCHECK_EQ(module->num_declared_functions, compilation_progress_.size());
DCHECK_GE(module->num_declared_functions,
@@ -2971,15 +2877,13 @@ void CompilationStateImpl::InitializeRecompilation(
: ExecutionTier::kTurbofan;
int imported = module->num_imported_functions;
// Generate necessary compilation units on the fly.
- CompilationUnitBuilder builder(native_module_);
for (int function_index : recompile_function_indexes) {
DCHECK_LE(imported, function_index);
int slot_index = function_index - imported;
auto& progress = compilation_progress_[slot_index];
progress = MissingRecompilationField::update(progress, true);
- builder.AddRecompilationUnit(function_index, new_tier);
+ builder->AddRecompilationUnit(function_index, new_tier);
}
- builder.Commit();
}
// Trigger callback if module needs no recompilation.
@@ -2987,6 +2891,12 @@ void CompilationStateImpl::InitializeRecompilation(
TriggerCallbacks(base::EnumSet<CompilationEvent>(
{CompilationEvent::kFinishedRecompilation}));
}
+
+ if (builder.has_value()) {
+ // Avoid holding lock while scheduling a compile job.
+ guard.reset();
+ builder->Commit();
+ }
}
void CompilationStateImpl::AddCallback(CompilationState::callback_t callback) {
@@ -3017,13 +2927,15 @@ void CompilationStateImpl::AddCompilationUnits(
compilation_unit_queues_.AddUnits(baseline_units, top_tier_units,
native_module_->module());
}
- js_to_wasm_wrapper_units_.insert(js_to_wasm_wrapper_units_.end(),
- js_to_wasm_wrapper_units.begin(),
- js_to_wasm_wrapper_units.end());
-
- size_t total_units = baseline_units.size() + top_tier_units.size() +
- js_to_wasm_wrapper_units.size();
- ScheduleCompileJobForNewUnits(static_cast<int>(total_units));
+ if (!js_to_wasm_wrapper_units.empty()) {
+ // |js_to_wasm_wrapper_units_| can only be modified before background
+ // compilation started.
+ DCHECK(!current_compile_job_ || !current_compile_job_->IsRunning());
+ js_to_wasm_wrapper_units_.insert(js_to_wasm_wrapper_units_.end(),
+ js_to_wasm_wrapper_units.begin(),
+ js_to_wasm_wrapper_units.end());
+ }
+ ScheduleCompileJobForNewUnits();
}
void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) {
@@ -3033,7 +2945,7 @@ void CompilationStateImpl::AddTopTierCompilationUnit(WasmCompilationUnit unit) {
void CompilationStateImpl::AddTopTierPriorityCompilationUnit(
WasmCompilationUnit unit, size_t priority) {
compilation_unit_queues_.AddTopTierPriorityUnit(unit, priority);
- ScheduleCompileJobForNewUnits(1);
+ ScheduleCompileJobForNewUnits();
}
std::shared_ptr<JSToWasmWrapperCompilationUnit>
@@ -3055,7 +2967,7 @@ void CompilationStateImpl::FinalizeJSToWasmWrappers(
// optimization we keep the code space unlocked to avoid repeated unlocking
// because many such wrapper are allocated in sequence below.
TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("v8.wasm.detailed"),
- "wasm.FinalizeJSToWasmWrappers", "num_wrappers",
+ "wasm.FinalizeJSToWasmWrappers", "wrappers",
js_to_wasm_wrapper_units_.size());
CodeSpaceMemoryModificationScope modification_scope(isolate->heap());
for (auto& unit : js_to_wasm_wrapper_units_) {
@@ -3067,15 +2979,20 @@ void CompilationStateImpl::FinalizeJSToWasmWrappers(
}
}
+CompilationUnitQueues::Queue* CompilationStateImpl::GetQueueForCompileTask(
+ int task_id) {
+ return compilation_unit_queues_.GetQueueForTask(task_id);
+}
+
base::Optional<WasmCompilationUnit>
CompilationStateImpl::GetNextCompilationUnit(
- int task_id, CompileBaselineOnly baseline_only) {
- return compilation_unit_queues_.GetNextUnit(task_id, baseline_only);
+ CompilationUnitQueues::Queue* queue, CompileBaselineOnly baseline_only) {
+ return compilation_unit_queues_.GetNextUnit(queue, baseline_only);
}
void CompilationStateImpl::OnFinishedUnits(Vector<WasmCode*> code_vector) {
TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("v8.wasm.detailed"),
- "wasm.OnFinishedUnits", "num_units", code_vector.size());
+ "wasm.OnFinishedUnits", "units", code_vector.size());
base::MutexGuard guard(&callbacks_mutex_);
@@ -3230,24 +3147,7 @@ void CompilationStateImpl::TriggerCallbacks(
}
}
-int CompilationStateImpl::GetUnpublishedUnitsLimits(int task_id) {
- // We want background threads to publish regularly (to avoid contention when
- // they are all publishing at the end). On the other side, each publishing has
- // some overhead (part of it for synchronizing between threads), so it should
- // not happen *too* often.
- // Thus aim for 4-8 publishes per thread, but distribute it such that
- // publishing is likely to happen at different times.
- int units_per_thread =
- static_cast<int>(native_module_->module()->num_declared_functions /
- max_compile_concurrency_);
- int min = units_per_thread / 8;
- // Return something between {min} and {2*min}, but not smaller than {10}.
- return std::max(10, min + (min * task_id / max_compile_concurrency_));
-}
-
-void CompilationStateImpl::OnCompilationStopped(int task_id,
- const WasmFeatures& detected) {
- DCHECK_GE(max_compile_concurrency_, task_id);
+void CompilationStateImpl::OnCompilationStopped(const WasmFeatures& detected) {
base::MutexGuard guard(&mutex_);
detected_features_.Add(detected);
}
@@ -3260,40 +3160,104 @@ void CompilationStateImpl::PublishDetectedFeatures(Isolate* isolate) {
UpdateFeatureUseCounts(isolate, detected_features_);
}
-void CompilationStateImpl::ScheduleCompileJobForNewUnits(int new_units) {
- // Increase the {scheduled_units_approximation_} counter and remember the old
- // value to check whether it increased towards {max_compile_concurrency_}.
- // In that case, we need to notify the compile job about the increased
- // concurrency.
- DCHECK_LT(0, new_units);
- int old_units = scheduled_units_approximation_->fetch_add(
- new_units, std::memory_order_relaxed);
- bool concurrency_increased = old_units < max_compile_concurrency_;
+void CompilationStateImpl::PublishCompilationResults(
+ std::vector<std::unique_ptr<WasmCode>> unpublished_code) {
+ if (unpublished_code.empty()) return;
- base::MutexGuard guard(&mutex_);
- if (current_compile_job_ && current_compile_job_->IsRunning()) {
- if (concurrency_increased) {
- current_compile_job_->NotifyConcurrencyIncrease();
+ // For import wrapper compilation units, add result to the cache.
+ int num_imported_functions = native_module_->num_imported_functions();
+ WasmImportWrapperCache* cache = native_module_->import_wrapper_cache();
+ for (const auto& code : unpublished_code) {
+ int func_index = code->index();
+ DCHECK_LE(0, func_index);
+ DCHECK_LT(func_index, native_module_->num_functions());
+ if (func_index < num_imported_functions) {
+ const FunctionSig* sig =
+ native_module_->module()->functions[func_index].sig;
+ WasmImportWrapperCache::CacheKey key(
+ compiler::kDefaultImportCallKind, sig,
+ static_cast<int>(sig->parameter_count()));
+ // If two imported functions have the same key, only one of them should
+ // have been added as a compilation unit. So it is always the first time
+ // we compile a wrapper for this key here.
+ DCHECK_NULL((*cache)[key]);
+ (*cache)[key] = code.get();
+ code->IncRef();
}
- return;
}
+ PublishCode(VectorOf(unpublished_code));
+}
+
+void CompilationStateImpl::PublishCode(Vector<std::unique_ptr<WasmCode>> code) {
+ WasmCodeRefScope code_ref_scope;
+ std::vector<WasmCode*> published_code =
+ native_module_->PublishCode(std::move(code));
+ // Defer logging code in case wire bytes were not fully received yet.
+ if (native_module_->HasWireBytes()) {
+ native_module_->engine()->LogCode(VectorOf(published_code));
+ }
+
+ OnFinishedUnits(VectorOf(std::move(published_code)));
+}
+
+void CompilationStateImpl::SchedulePublishCompilationResults(
+ std::vector<std::unique_ptr<WasmCode>> unpublished_code) {
+ {
+ base::MutexGuard guard(&publish_mutex_);
+ if (publisher_running_) {
+ // Add new code to the queue and return.
+ publish_queue_.reserve(publish_queue_.size() + unpublished_code.size());
+ for (auto& c : unpublished_code) {
+ publish_queue_.emplace_back(std::move(c));
+ }
+ return;
+ }
+ publisher_running_ = true;
+ }
+ while (true) {
+ PublishCompilationResults(std::move(unpublished_code));
+ unpublished_code.clear();
+
+ // Keep publishing new code that came in.
+ base::MutexGuard guard(&publish_mutex_);
+ DCHECK(publisher_running_);
+ if (publish_queue_.empty()) {
+ publisher_running_ = false;
+ return;
+ }
+ unpublished_code.swap(publish_queue_);
+ }
+}
+
+void CompilationStateImpl::ScheduleCompileJobForNewUnits() {
if (failed()) return;
- std::unique_ptr<JobTask> new_compile_job =
- std::make_unique<BackgroundCompileJob>(
- background_compile_token_, async_counters_,
- scheduled_units_approximation_, max_compile_concurrency_);
- // TODO(wasm): Lower priority for TurboFan-only jobs.
- std::shared_ptr<JobHandle> handle = V8::GetCurrentPlatform()->PostJob(
- has_priority_ ? TaskPriority::kUserBlocking : TaskPriority::kUserVisible,
- std::move(new_compile_job));
- native_module_->engine()->ShepherdCompileJobHandle(handle);
- current_compile_job_ =
- std::make_unique<ThreadSafeJobHandle>(std::move(handle));
+ std::shared_ptr<JobHandle> new_job_handle;
+ {
+ base::MutexGuard guard(&mutex_);
+ if (current_compile_job_ && current_compile_job_->IsValid()) {
+ current_compile_job_->NotifyConcurrencyIncrease();
+ return;
+ }
+
+ std::unique_ptr<JobTask> new_compile_job =
+ std::make_unique<BackgroundCompileJob>(native_module_weak_,
+ async_counters_);
+ // TODO(wasm): Lower priority for TurboFan-only jobs.
+ new_job_handle = V8::GetCurrentPlatform()->PostJob(
+ has_priority_ ? TaskPriority::kUserBlocking
+ : TaskPriority::kUserVisible,
+ std::move(new_compile_job));
+ current_compile_job_ = new_job_handle;
+ // Reset the priority. Later uses of the compilation state, e.g. for
+ // debugging, should compile with the default priority again.
+ has_priority_ = false;
+ }
- // Reset the priority. Later uses of the compilation state, e.g. for
- // debugging, should compile with the default priority again.
- has_priority_ = false;
+ if (new_job_handle) {
+ native_module_->engine()->ShepherdCompileJobHandle(
+ std::move(new_job_handle));
+ }
}
size_t CompilationStateImpl::NumOutstandingCompilations() const {
@@ -3307,12 +3271,14 @@ size_t CompilationStateImpl::NumOutstandingCompilations() const {
}
void CompilationStateImpl::SetError() {
+ compile_cancelled_.store(true, std::memory_order_relaxed);
if (compile_failed_.exchange(true, std::memory_order_relaxed)) {
return; // Already failed before.
}
base::MutexGuard callbacks_guard(&callbacks_mutex_);
TriggerCallbacks();
+ callbacks_.clear();
}
void CompilationStateImpl::WaitForCompilationEvent(
@@ -3330,7 +3296,7 @@ void CompilationStateImpl::WaitForCompilationEvent(
}
constexpr JobDelegate* kNoDelegate = nullptr;
- ExecuteCompilationUnits(background_compile_token_, async_counters_.get(),
+ ExecuteCompilationUnits(native_module_weak_, async_counters_.get(),
kNoDelegate, kBaselineOnly);
compilation_event_semaphore->Wait();
}
@@ -3350,7 +3316,6 @@ class CompileJSToWasmWrapperJob final : public JobTask {
size_t max_concurrency)
: queue_(queue),
compilation_units_(compilation_units),
- max_concurrency_(max_concurrency),
outstanding_units_(queue->size()) {}
void Run(JobDelegate* delegate) override {
@@ -3366,14 +3331,15 @@ class CompileJSToWasmWrapperJob final : public JobTask {
// {outstanding_units_} includes the units that other workers are currently
// working on, so we can safely ignore the {worker_count} and just return
// the current number of outstanding units.
- return std::min(max_concurrency_,
+ size_t flag_limit =
+ static_cast<size_t>(std::max(1, FLAG_wasm_num_compilation_tasks));
+ return std::min(flag_limit,
outstanding_units_.load(std::memory_order_relaxed));
}
private:
JSToWasmWrapperQueue* const queue_;
JSToWasmWrapperUnitMap* const compilation_units_;
- const size_t max_concurrency_;
std::atomic<size_t> outstanding_units_;
};
} // namespace
@@ -3395,7 +3361,8 @@ void CompileJsToWasmWrappers(Isolate* isolate, const WasmModule* module,
if (queue.insert(key)) {
auto unit = std::make_unique<JSToWasmWrapperCompilationUnit>(
isolate, isolate->wasm_engine(), function.sig, module,
- function.imported, enabled_features);
+ function.imported, enabled_features,
+ JSToWasmWrapperCompilationUnit::kAllowGeneric);
compilation_units.emplace(key, std::move(unit));
}
}