diff options
author | Gus Caplan <me@gus.host> | 2022-04-23 19:02:11 -0500 |
---|---|---|
committer | Gus Caplan <me@gus.host> | 2022-04-23 19:02:11 -0500 |
commit | f9e0c4d5d22bb9a1bc2455c6f862e045381d2b7c (patch) | |
tree | 9427493cb34ab20b8f812a6cd216418016a923e4 | |
parent | e2ff1c3b2c40874a41c0c75f2551420af78b96bf (diff) | |
download | node-new-f9e0c4d5d22bb9a1bc2455c6f862e045381d2b7c.tar.gz |
src,v8: add wasm streaming handler api
-rw-r--r-- | lib/internal/bootstrap/pre_execution.js | 17 | ||||
-rw-r--r-- | lib/internal/v8.js | 21 | ||||
-rw-r--r-- | lib/v8.js | 2 | ||||
-rw-r--r-- | src/env.h | 1 | ||||
-rw-r--r-- | src/node_wasm_web_api.cc | 84 | ||||
-rw-r--r-- | src/node_wasm_web_api.h | 2 |
6 files changed, 127 insertions, 0 deletions
diff --git a/lib/internal/bootstrap/pre_execution.js b/lib/internal/bootstrap/pre_execution.js index e1b882b6db..3724e9fb9a 100644 --- a/lib/internal/bootstrap/pre_execution.js +++ b/lib/internal/bootstrap/pre_execution.js @@ -245,6 +245,16 @@ function setupFetch() { throw new ERR_WEBASSEMBLY_RESPONSE('body has already been used'); } + const handler = require('internal/v8').getWasmStreamingHandler(); + const cached = handler?.get(response.url); + if (cached !== undefined && cache !== null) { + if (streamState.setCompiledModuleBytes(cached)) { + return; + } else { + handler.delete(response.url); + } + } + // Pass all data from the response body to the WebAssembly compiler. for await (const chunk of response.body) { streamState.push(chunk); @@ -257,6 +267,13 @@ function setupFetch() { // and usable Response or because a network error occurred. streamState.abort(err); }); + }, (url, buffer) => { + const handler = require('internal/v8').getWasmStreamingHandler(); + try { + handler?.set(url, buffer); + } catch (e) { + triggerUncaughtException(e, false); + } }); } diff --git a/lib/internal/v8.js b/lib/internal/v8.js new file mode 100644 index 0000000000..38d9a0c79b --- /dev/null +++ b/lib/internal/v8.js @@ -0,0 +1,21 @@ +'use strict'; + +let streamingHandler; +function setWasmStreamingHandler(handler) { + if (handler !== undefined && handler !== null) { + validateObject(handler, 'handler'); + validateFunction(handler.get, 'handler.get'); + validateFunction(handler.set, 'handler.set'); + validateFunction(handler.delete, 'handler.delete'); + } + streamingHandler = handler; +} + +function getWasmStreamingHandler() { + return streamingHandler; +} + +module.exports = { + setWasmStreamingHandler, + getWasmStreamingHandler, +}; @@ -58,6 +58,7 @@ const { } = internalBinding('heap_utils'); const { HeapSnapshotStream } = require('internal/heap_utils'); const promiseHooks = require('internal/promise_hooks'); +const { setWasmStreamingHandler } = require('internal/v8'); /** * Generates a snapshot of the current V8 heap @@ -369,4 +370,5 @@ module.exports = { serialize, writeHeapSnapshot, promiseHooks, + setWasmStreamingHandler, }; @@ -552,6 +552,7 @@ constexpr size_t kFsStatsBufferLength = V(udp_constructor_function, v8::Function) \ V(url_constructor_function, v8::Function) \ V(wasm_streaming_compilation_impl, v8::Function) \ + V(wasm_streaming_compilation_callback, v8::Function) \ V(wasm_streaming_object_constructor, v8::Function) class Environment; diff --git a/src/node_wasm_web_api.cc b/src/node_wasm_web_api.cc index b23096120b..9a5adc0db5 100644 --- a/src/node_wasm_web_api.cc +++ b/src/node_wasm_web_api.cc @@ -8,6 +8,8 @@ namespace wasm_web_api { using v8::ArrayBuffer; using v8::ArrayBufferView; +using v8::BackingStore; +using v8::CompiledWasmModule; using v8::Context; using v8::Function; using v8::FunctionCallbackInfo; @@ -15,6 +17,8 @@ using v8::FunctionTemplate; using v8::Local; using v8::MaybeLocal; using v8::Object; +using v8::OwnedBuffer; +using v8::String; using v8::Value; using v8::WasmStreaming; @@ -32,6 +36,7 @@ Local<Function> WasmStreamingObject::Initialize(Environment* env) { env->SetProtoMethod(t, "push", Push); env->SetProtoMethod(t, "finish", Finish); env->SetProtoMethod(t, "abort", Abort); + env->SetProtoMethod(t, "setCompiledModuleBytes", SetCompiledModuleBytes); auto function = t->GetFunction(env->context()).ToLocalChecked(); env->set_wasm_streaming_object_constructor(function); @@ -43,6 +48,7 @@ void WasmStreamingObject::RegisterExternalReferences( registry->Register(Push); registry->Register(Finish); registry->Register(Abort); + registry->Register(SetCompiledModuleBytes); } void WasmStreamingObject::MemoryInfo(MemoryTracker* tracker) const { @@ -52,6 +58,41 @@ void WasmStreamingObject::MemoryInfo(MemoryTracker* tracker) const { tracker->TrackFieldWithSize("streaming", wasm_size_); } +class WasmStreamingClient : public WasmStreaming::Client { + public: + explicit WasmStreamingClient(Environment* env) : env_(env) {} + + private: + void OnModuleCompiled(CompiledWasmModule compiled_module) { + env_->SetImmediateThreadsafe([compiled_module](Environment* env) mutable { + OwnedBuffer owned = compiled_module.Serialize(); + if (owned.size == 0) { + return; + } + + std::shared_ptr<BackingStore> store = + ArrayBuffer::NewBackingStore(env->isolate(), owned.size); + unsigned char* dest = static_cast<unsigned char*>(store->Data()); + memcpy(dest, &owned.buffer, owned.size); + Local<ArrayBuffer> ab = ArrayBuffer::New(env->isolate(), store); + Local<String> url = + String::NewFromUtf8(env->isolate(), + compiled_module.source_url().c_str(), + {}, + compiled_module.source_url().size()) + .ToLocalChecked(); + + Local<Value> args[] = {url, ab}; + env->wasm_streaming_compilation_callback() + ->Call( + env->context(), Undefined(env->isolate()), arraysize(args), args) + .ToLocalChecked(); + }); + } + + Environment* env_; +}; + MaybeLocal<Object> WasmStreamingObject::Create( Environment* env, std::shared_ptr<WasmStreaming> streaming) { Local<Function> ctor = Initialize(env); @@ -66,6 +107,9 @@ MaybeLocal<Object> WasmStreamingObject::Create( CHECK_NOT_NULL(ptr); ptr->streaming_ = streaming; ptr->wasm_size_ = 0; + + ptr->streaming_->SetClient(std::make_shared<WasmStreamingClient>(env)); + return obj; } @@ -111,6 +155,45 @@ void WasmStreamingObject::Push(const FunctionCallbackInfo<Value>& args) { obj->wasm_size_ += size; } +void WasmStreamingObject::SetCompiledModuleBytes( + const FunctionCallbackInfo<Value>& args) { + WasmStreamingObject* obj; + ASSIGN_OR_RETURN_UNWRAP(&obj, args.Holder()); + CHECK(obj->streaming_); + + CHECK_EQ(args.Length(), 1); + Local<Value> chunk = args[0]; + + // The start of the memory section backing the ArrayBuffer(View), the offset + // of the ArrayBuffer(View) within the memory section, and its size in bytes. + const void* bytes; + size_t offset; + size_t size; + + if (LIKELY(chunk->IsArrayBufferView())) { + Local<ArrayBufferView> view = chunk.As<ArrayBufferView>(); + bytes = view->Buffer()->GetBackingStore()->Data(); + offset = view->ByteOffset(); + size = view->ByteLength(); + } else if (LIKELY(chunk->IsArrayBuffer())) { + Local<ArrayBuffer> buffer = chunk.As<ArrayBuffer>(); + bytes = buffer->GetBackingStore()->Data(); + offset = 0; + size = buffer->ByteLength(); + } else { + return node::THROW_ERR_INVALID_ARG_TYPE( + Environment::GetCurrent(args), + "chunk must be an ArrayBufferView or an ArrayBuffer"); + } + + bool bytes_used = obj->streaming_->SetCompiledModuleBytes( + static_cast<const uint8_t*>(bytes) + offset, size); + if (bytes_used) { + obj->wasm_size_ += size; + } + args.GetReturnValue().Set(bytes_used); +} + void WasmStreamingObject::Finish(const FunctionCallbackInfo<Value>& args) { WasmStreamingObject* obj; ASSIGN_OR_RETURN_UNWRAP(&obj, args.Holder()); @@ -174,6 +257,7 @@ void StartStreamingCompilation(const FunctionCallbackInfo<Value>& info) { void SetImplementation(const FunctionCallbackInfo<Value>& info) { Environment* env = Environment::GetCurrent(info); env->set_wasm_streaming_compilation_impl(info[0].As<Function>()); + env->set_wasm_streaming_compilation_callback(info[1].As<Function>()); } void Initialize(Local<Object> target, diff --git a/src/node_wasm_web_api.h b/src/node_wasm_web_api.h index 9f5fe86816..b979087eca 100644 --- a/src/node_wasm_web_api.h +++ b/src/node_wasm_web_api.h @@ -36,6 +36,8 @@ class WasmStreamingObject final : public BaseObject { static void Push(const v8::FunctionCallbackInfo<v8::Value>& args); static void Finish(const v8::FunctionCallbackInfo<v8::Value>& args); static void Abort(const v8::FunctionCallbackInfo<v8::Value>& args); + static void SetCompiledModuleBytes( + const v8::FunctionCallbackInfo<v8::Value>& args); std::shared_ptr<v8::WasmStreaming> streaming_; size_t wasm_size_; |