summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGus Caplan <me@gus.host>2022-04-23 19:02:11 -0500
committerGus Caplan <me@gus.host>2022-04-23 19:02:11 -0500
commitf9e0c4d5d22bb9a1bc2455c6f862e045381d2b7c (patch)
tree9427493cb34ab20b8f812a6cd216418016a923e4
parente2ff1c3b2c40874a41c0c75f2551420af78b96bf (diff)
downloadnode-new-f9e0c4d5d22bb9a1bc2455c6f862e045381d2b7c.tar.gz
src,v8: add wasm streaming handler api
-rw-r--r--lib/internal/bootstrap/pre_execution.js17
-rw-r--r--lib/internal/v8.js21
-rw-r--r--lib/v8.js2
-rw-r--r--src/env.h1
-rw-r--r--src/node_wasm_web_api.cc84
-rw-r--r--src/node_wasm_web_api.h2
6 files changed, 127 insertions, 0 deletions
diff --git a/lib/internal/bootstrap/pre_execution.js b/lib/internal/bootstrap/pre_execution.js
index e1b882b6db..3724e9fb9a 100644
--- a/lib/internal/bootstrap/pre_execution.js
+++ b/lib/internal/bootstrap/pre_execution.js
@@ -245,6 +245,16 @@ function setupFetch() {
throw new ERR_WEBASSEMBLY_RESPONSE('body has already been used');
}
+ const handler = require('internal/v8').getWasmStreamingHandler();
+ const cached = handler?.get(response.url);
+ if (cached !== undefined && cache !== null) {
+ if (streamState.setCompiledModuleBytes(cached)) {
+ return;
+ } else {
+ handler.delete(response.url);
+ }
+ }
+
// Pass all data from the response body to the WebAssembly compiler.
for await (const chunk of response.body) {
streamState.push(chunk);
@@ -257,6 +267,13 @@ function setupFetch() {
// and usable Response or because a network error occurred.
streamState.abort(err);
});
+ }, (url, buffer) => {
+ const handler = require('internal/v8').getWasmStreamingHandler();
+ try {
+ handler?.set(url, buffer);
+ } catch (e) {
+ triggerUncaughtException(e, false);
+ }
});
}
diff --git a/lib/internal/v8.js b/lib/internal/v8.js
new file mode 100644
index 0000000000..38d9a0c79b
--- /dev/null
+++ b/lib/internal/v8.js
@@ -0,0 +1,21 @@
+'use strict';
+
+let streamingHandler;
+function setWasmStreamingHandler(handler) {
+ if (handler !== undefined && handler !== null) {
+ validateObject(handler, 'handler');
+ validateFunction(handler.get, 'handler.get');
+ validateFunction(handler.set, 'handler.set');
+ validateFunction(handler.delete, 'handler.delete');
+ }
+ streamingHandler = handler;
+}
+
+function getWasmStreamingHandler() {
+ return streamingHandler;
+}
+
+module.exports = {
+ setWasmStreamingHandler,
+ getWasmStreamingHandler,
+};
diff --git a/lib/v8.js b/lib/v8.js
index e899da4684..ce90a63332 100644
--- a/lib/v8.js
+++ b/lib/v8.js
@@ -58,6 +58,7 @@ const {
} = internalBinding('heap_utils');
const { HeapSnapshotStream } = require('internal/heap_utils');
const promiseHooks = require('internal/promise_hooks');
+const { setWasmStreamingHandler } = require('internal/v8');
/**
* Generates a snapshot of the current V8 heap
@@ -369,4 +370,5 @@ module.exports = {
serialize,
writeHeapSnapshot,
promiseHooks,
+ setWasmStreamingHandler,
};
diff --git a/src/env.h b/src/env.h
index 7e35833e45..b4c7c443a8 100644
--- a/src/env.h
+++ b/src/env.h
@@ -552,6 +552,7 @@ constexpr size_t kFsStatsBufferLength =
V(udp_constructor_function, v8::Function) \
V(url_constructor_function, v8::Function) \
V(wasm_streaming_compilation_impl, v8::Function) \
+ V(wasm_streaming_compilation_callback, v8::Function) \
V(wasm_streaming_object_constructor, v8::Function)
class Environment;
diff --git a/src/node_wasm_web_api.cc b/src/node_wasm_web_api.cc
index b23096120b..9a5adc0db5 100644
--- a/src/node_wasm_web_api.cc
+++ b/src/node_wasm_web_api.cc
@@ -8,6 +8,8 @@ namespace wasm_web_api {
using v8::ArrayBuffer;
using v8::ArrayBufferView;
+using v8::BackingStore;
+using v8::CompiledWasmModule;
using v8::Context;
using v8::Function;
using v8::FunctionCallbackInfo;
@@ -15,6 +17,8 @@ using v8::FunctionTemplate;
using v8::Local;
using v8::MaybeLocal;
using v8::Object;
+using v8::OwnedBuffer;
+using v8::String;
using v8::Value;
using v8::WasmStreaming;
@@ -32,6 +36,7 @@ Local<Function> WasmStreamingObject::Initialize(Environment* env) {
env->SetProtoMethod(t, "push", Push);
env->SetProtoMethod(t, "finish", Finish);
env->SetProtoMethod(t, "abort", Abort);
+ env->SetProtoMethod(t, "setCompiledModuleBytes", SetCompiledModuleBytes);
auto function = t->GetFunction(env->context()).ToLocalChecked();
env->set_wasm_streaming_object_constructor(function);
@@ -43,6 +48,7 @@ void WasmStreamingObject::RegisterExternalReferences(
registry->Register(Push);
registry->Register(Finish);
registry->Register(Abort);
+ registry->Register(SetCompiledModuleBytes);
}
void WasmStreamingObject::MemoryInfo(MemoryTracker* tracker) const {
@@ -52,6 +58,41 @@ void WasmStreamingObject::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackFieldWithSize("streaming", wasm_size_);
}
+class WasmStreamingClient : public WasmStreaming::Client {
+ public:
+ explicit WasmStreamingClient(Environment* env) : env_(env) {}
+
+ private:
+ void OnModuleCompiled(CompiledWasmModule compiled_module) {
+ env_->SetImmediateThreadsafe([compiled_module](Environment* env) mutable {
+ OwnedBuffer owned = compiled_module.Serialize();
+ if (owned.size == 0) {
+ return;
+ }
+
+ std::shared_ptr<BackingStore> store =
+ ArrayBuffer::NewBackingStore(env->isolate(), owned.size);
+ unsigned char* dest = static_cast<unsigned char*>(store->Data());
+ memcpy(dest, &owned.buffer, owned.size);
+ Local<ArrayBuffer> ab = ArrayBuffer::New(env->isolate(), store);
+ Local<String> url =
+ String::NewFromUtf8(env->isolate(),
+ compiled_module.source_url().c_str(),
+ {},
+ compiled_module.source_url().size())
+ .ToLocalChecked();
+
+ Local<Value> args[] = {url, ab};
+ env->wasm_streaming_compilation_callback()
+ ->Call(
+ env->context(), Undefined(env->isolate()), arraysize(args), args)
+ .ToLocalChecked();
+ });
+ }
+
+ Environment* env_;
+};
+
MaybeLocal<Object> WasmStreamingObject::Create(
Environment* env, std::shared_ptr<WasmStreaming> streaming) {
Local<Function> ctor = Initialize(env);
@@ -66,6 +107,9 @@ MaybeLocal<Object> WasmStreamingObject::Create(
CHECK_NOT_NULL(ptr);
ptr->streaming_ = streaming;
ptr->wasm_size_ = 0;
+
+ ptr->streaming_->SetClient(std::make_shared<WasmStreamingClient>(env));
+
return obj;
}
@@ -111,6 +155,45 @@ void WasmStreamingObject::Push(const FunctionCallbackInfo<Value>& args) {
obj->wasm_size_ += size;
}
+void WasmStreamingObject::SetCompiledModuleBytes(
+ const FunctionCallbackInfo<Value>& args) {
+ WasmStreamingObject* obj;
+ ASSIGN_OR_RETURN_UNWRAP(&obj, args.Holder());
+ CHECK(obj->streaming_);
+
+ CHECK_EQ(args.Length(), 1);
+ Local<Value> chunk = args[0];
+
+ // The start of the memory section backing the ArrayBuffer(View), the offset
+ // of the ArrayBuffer(View) within the memory section, and its size in bytes.
+ const void* bytes;
+ size_t offset;
+ size_t size;
+
+ if (LIKELY(chunk->IsArrayBufferView())) {
+ Local<ArrayBufferView> view = chunk.As<ArrayBufferView>();
+ bytes = view->Buffer()->GetBackingStore()->Data();
+ offset = view->ByteOffset();
+ size = view->ByteLength();
+ } else if (LIKELY(chunk->IsArrayBuffer())) {
+ Local<ArrayBuffer> buffer = chunk.As<ArrayBuffer>();
+ bytes = buffer->GetBackingStore()->Data();
+ offset = 0;
+ size = buffer->ByteLength();
+ } else {
+ return node::THROW_ERR_INVALID_ARG_TYPE(
+ Environment::GetCurrent(args),
+ "chunk must be an ArrayBufferView or an ArrayBuffer");
+ }
+
+ bool bytes_used = obj->streaming_->SetCompiledModuleBytes(
+ static_cast<const uint8_t*>(bytes) + offset, size);
+ if (bytes_used) {
+ obj->wasm_size_ += size;
+ }
+ args.GetReturnValue().Set(bytes_used);
+}
+
void WasmStreamingObject::Finish(const FunctionCallbackInfo<Value>& args) {
WasmStreamingObject* obj;
ASSIGN_OR_RETURN_UNWRAP(&obj, args.Holder());
@@ -174,6 +257,7 @@ void StartStreamingCompilation(const FunctionCallbackInfo<Value>& info) {
void SetImplementation(const FunctionCallbackInfo<Value>& info) {
Environment* env = Environment::GetCurrent(info);
env->set_wasm_streaming_compilation_impl(info[0].As<Function>());
+ env->set_wasm_streaming_compilation_callback(info[1].As<Function>());
}
void Initialize(Local<Object> target,
diff --git a/src/node_wasm_web_api.h b/src/node_wasm_web_api.h
index 9f5fe86816..b979087eca 100644
--- a/src/node_wasm_web_api.h
+++ b/src/node_wasm_web_api.h
@@ -36,6 +36,8 @@ class WasmStreamingObject final : public BaseObject {
static void Push(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Finish(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Abort(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void SetCompiledModuleBytes(
+ const v8::FunctionCallbackInfo<v8::Value>& args);
std::shared_ptr<v8::WasmStreaming> streaming_;
size_t wasm_size_;