diff options
| author | Gabriel Schulhof <gabriel.schulhof@intel.com> | 2018-11-17 12:34:54 -0800 |
|---|---|---|
| committer | Gabriel Schulhof <gabriel.schulhof@intel.com> | 2018-12-04 13:58:17 -0800 |
| commit | 938e11882b96e19b443477571455088baaa054d8 (patch) | |
| tree | eb828a60957a2881995ba9a83f44a32a18fbff16 /test/node-api/test_threadsafe_function | |
| parent | 83ee137c4565112177f22f2c735b266b22262220 (diff) | |
| download | node-new-938e11882b96e19b443477571455088baaa054d8.tar.gz | |
test: partition N-API tests
Partition test/addons-napi into test/js-native-api and test/node-api to
isolate the Node.js-agnostic portion of the N-API tests from the
Node.js-specific portion.
PR-URL: https://github.com/nodejs/node/pull/24557
Reviewed-By: Refael Ackermann <refack@gmail.com>
Reviewed-By: Daniel Bevenius <daniel.bevenius@gmail.com>
Diffstat (limited to 'test/node-api/test_threadsafe_function')
| -rw-r--r-- | test/node-api/test_threadsafe_function/binding.c | 283 | ||||
| -rw-r--r-- | test/node-api/test_threadsafe_function/binding.gyp | 8 | ||||
| -rw-r--r-- | test/node-api/test_threadsafe_function/test.js | 206 |
3 files changed, 497 insertions, 0 deletions
diff --git a/test/node-api/test_threadsafe_function/binding.c b/test/node-api/test_threadsafe_function/binding.c new file mode 100644 index 0000000000..c985178c5a --- /dev/null +++ b/test/node-api/test_threadsafe_function/binding.c @@ -0,0 +1,283 @@ +// For the purpose of this test we use libuv's threading library. When deciding +// on a threading library for a new project it bears remembering that in the +// future libuv may introduce API changes which may render it non-ABI-stable, +// which, in turn, may affect the ABI stability of the project despite its use +// of N-API. +#include <uv.h> +#define NAPI_EXPERIMENTAL +#include <node_api.h> +#include "../../js-native-api/common.h" + +#define ARRAY_LENGTH 10 +#define MAX_QUEUE_SIZE 2 + +static uv_thread_t uv_threads[2]; +static napi_threadsafe_function ts_fn; + +typedef struct { + napi_threadsafe_function_call_mode block_on_full; + napi_threadsafe_function_release_mode abort; + bool start_secondary; + napi_ref js_finalize_cb; + uint32_t max_queue_size; +} ts_fn_hint; + +static ts_fn_hint ts_info; + +// Thread data to transmit to JS +static int ints[ARRAY_LENGTH]; + +static void secondary_thread(void* data) { + napi_threadsafe_function ts_fn = data; + + if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) { + napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH, + "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH); + } +} + +// Source thread producing the data +static void data_source_thread(void* data) { + napi_threadsafe_function ts_fn = data; + int index; + void* hint; + ts_fn_hint *ts_fn_info; + napi_status status; + bool queue_was_full = false; + bool queue_was_closing = false; + + if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_get_threadsafe_function_context failed", NAPI_AUTO_LENGTH); + } + + ts_fn_info = (ts_fn_hint *)hint; + + if (ts_fn_info != &ts_info) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "thread-safe function hint is not as expected", NAPI_AUTO_LENGTH); + } + + if (ts_fn_info->start_secondary) { + if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_acquire_threadsafe_function failed", NAPI_AUTO_LENGTH); + } + + if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "failed to start secondary thread", NAPI_AUTO_LENGTH); + } + } + + for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) { + status = napi_call_threadsafe_function(ts_fn, &ints[index], + ts_fn_info->block_on_full); + if (ts_fn_info->max_queue_size == 0) { + // Let's make this thread really busy for 200 ms to give the main thread a + // chance to abort. + uint64_t start = uv_hrtime(); + for (; uv_hrtime() - start < 200000000;); + } + switch (status) { + case napi_queue_full: + queue_was_full = true; + index++; + // fall through + + case napi_ok: + continue; + + case napi_closing: + queue_was_closing = true; + break; + + default: + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_call_threadsafe_function failed", NAPI_AUTO_LENGTH); + } + } + + // Assert that the enqueuing of a value was refused at least once, if this is + // a non-blocking test run. + if (!ts_fn_info->block_on_full && !queue_was_full) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "queue was never full", NAPI_AUTO_LENGTH); + } + + // Assert that the queue was marked as closing at least once, if this is an + // aborting test run. + if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "queue was never closing", NAPI_AUTO_LENGTH); + } + + if (!queue_was_closing && + napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH); + } +} + +// Getting the data into JS +static void call_js(napi_env env, napi_value cb, void* hint, void* data) { + if (!(env == NULL || cb == NULL)) { + napi_value argv, undefined; + NAPI_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv)); + NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + NAPI_CALL_RETURN_VOID(env, napi_call_function(env, undefined, cb, 1, &argv, + NULL)); + } +} + +// Cleanup +static napi_value StopThread(napi_env env, napi_callback_info info) { + size_t argc = 2; + napi_value argv[2]; + NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + napi_valuetype value_type; + NAPI_CALL(env, napi_typeof(env, argv[0], &value_type)); + NAPI_ASSERT(env, value_type == napi_function, + "StopThread argument is a function"); + NAPI_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function"); + NAPI_CALL(env, + napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb))); + bool abort; + NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort)); + NAPI_CALL(env, + napi_release_threadsafe_function(ts_fn, + abort ? napi_tsfn_abort : napi_tsfn_release)); + ts_fn = NULL; + return NULL; +} + +// Join the thread and inform JS that we're done. +static void join_the_threads(napi_env env, void *data, void *hint) { + uv_thread_t *the_threads = data; + ts_fn_hint *the_hint = hint; + napi_value js_cb, undefined; + + uv_thread_join(&the_threads[0]); + if (the_hint->start_secondary) { + uv_thread_join(&the_threads[1]); + } + + NAPI_CALL_RETURN_VOID(env, + napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb)); + NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + NAPI_CALL_RETURN_VOID(env, + napi_call_function(env, undefined, js_cb, 0, NULL, NULL)); + NAPI_CALL_RETURN_VOID(env, napi_delete_reference(env, + the_hint->js_finalize_cb)); +} + +static napi_value StartThreadInternal(napi_env env, + napi_callback_info info, + napi_threadsafe_function_call_js cb, + bool block_on_full) { + size_t argc = 4; + napi_value argv[4]; + + ts_info.block_on_full = + (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking); + + NAPI_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function"); + NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + napi_value async_name; + NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test", + NAPI_AUTO_LENGTH, &async_name)); + NAPI_CALL(env, napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size)); + NAPI_CALL(env, napi_create_threadsafe_function(env, + argv[0], + NULL, + async_name, + ts_info.max_queue_size, + 2, + uv_threads, + join_the_threads, + &ts_info, + cb, + &ts_fn)); + bool abort; + NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort)); + ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release; + NAPI_CALL(env, napi_get_value_bool(env, argv[2], &(ts_info.start_secondary))); + + NAPI_ASSERT(env, + (uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0), + "Thread creation"); + + return NULL; +} + +static napi_value Unref(napi_env env, napi_callback_info info) { + NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function"); + NAPI_CALL(env, napi_unref_threadsafe_function(env, ts_fn)); + return NULL; +} + +static napi_value Release(napi_env env, napi_callback_info info) { + NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function"); + NAPI_CALL(env, napi_release_threadsafe_function(ts_fn, napi_tsfn_release)); + return NULL; +} + +// Startup +static napi_value StartThread(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, call_js, true); +} + +static napi_value StartThreadNonblocking(napi_env env, + napi_callback_info info) { + return StartThreadInternal(env, info, call_js, false); +} + +static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, NULL, true); +} + +// Module init +static napi_value Init(napi_env env, napi_value exports) { + size_t index; + for (index = 0; index < ARRAY_LENGTH; index++) { + ints[index] = index; + } + napi_value js_array_length, js_max_queue_size; + napi_create_uint32(env, ARRAY_LENGTH, &js_array_length); + napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size); + + napi_property_descriptor properties[] = { + { + "ARRAY_LENGTH", + NULL, + NULL, + NULL, + NULL, + js_array_length, + napi_enumerable, + NULL + }, + { + "MAX_QUEUE_SIZE", + NULL, + NULL, + NULL, + NULL, + js_max_queue_size, + napi_enumerable, + NULL + }, + DECLARE_NAPI_PROPERTY("StartThread", StartThread), + DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative), + DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking), + DECLARE_NAPI_PROPERTY("StopThread", StopThread), + DECLARE_NAPI_PROPERTY("Unref", Unref), + DECLARE_NAPI_PROPERTY("Release", Release), + }; + + NAPI_CALL(env, napi_define_properties(env, exports, + sizeof(properties)/sizeof(properties[0]), properties)); + + return exports; +} +NAPI_MODULE(NODE_GYP_MODULE_NAME, Init) diff --git a/test/node-api/test_threadsafe_function/binding.gyp b/test/node-api/test_threadsafe_function/binding.gyp new file mode 100644 index 0000000000..b60352e05a --- /dev/null +++ b/test/node-api/test_threadsafe_function/binding.gyp @@ -0,0 +1,8 @@ +{ + 'targets': [ + { + 'target_name': 'binding', + 'sources': ['binding.c'] + } + ] +} diff --git a/test/node-api/test_threadsafe_function/test.js b/test/node-api/test_threadsafe_function/test.js new file mode 100644 index 0000000000..cfd0028530 --- /dev/null +++ b/test/node-api/test_threadsafe_function/test.js @@ -0,0 +1,206 @@ +'use strict'; + +const common = require('../../common'); +const assert = require('assert'); +const binding = require(`./build/${common.buildType}/binding`); +const { fork } = require('child_process'); +const expectedArray = (function(arrayLength) { + const result = []; + for (let index = 0; index < arrayLength; index++) { + result.push(arrayLength - 1 - index); + } + return result; +})(binding.ARRAY_LENGTH); + +// Handle the rapid teardown test case as the child process. We unref the +// thread-safe function after we have received two values. This causes the +// process to exit and the environment cleanup handler to be invoked. +if (process.argv[2] === 'child') { + let callCount = 0; + binding.StartThread((value) => { + callCount++; + console.log(value); + if (callCount === 2) { + binding.Unref(); + } + }, false /* abort */, true /* launchSecondary */, +process.argv[3]); + + // Release the thread-safe function from the main thread so that it may be + // torn down via the environment cleanup handler. + binding.Release(); + return; +} + +function testWithJSMarshaller({ + threadStarter, + quitAfter, + abort, + maxQueueSize, + launchSecondary }) { + return new Promise((resolve) => { + const array = []; + binding[threadStarter](function testCallback(value) { + array.push(value); + if (array.length === quitAfter) { + setImmediate(() => { + binding.StopThread(common.mustCall(() => { + resolve(array); + }), !!abort); + }); + } + }, !!abort, !!launchSecondary, maxQueueSize); + if (threadStarter === 'StartThreadNonblocking') { + // Let's make this thread really busy for a short while to ensure that + // the queue fills and the thread receives a napi_queue_full. + const start = Date.now(); + while (Date.now() - start < 200); + } + }); +} + +function testUnref(queueSize) { + return new Promise((resolve, reject) => { + let output = ''; + const child = fork(__filename, ['child', queueSize], { + stdio: [process.stdin, 'pipe', process.stderr, 'ipc'] + }); + child.on('close', (code) => { + if (code === 0) { + resolve(output.match(/\S+/g)); + } else { + reject(new Error('Child process died with code ' + code)); + } + }); + child.stdout.on('data', (data) => (output += data.toString())); + }) + .then((result) => assert.strictEqual(result.indexOf(0), -1)); +} + +new Promise(function testWithoutJSMarshaller(resolve) { + let callCount = 0; + binding.StartThreadNoNative(function testCallback() { + callCount++; + + // The default call-into-JS implementation passes no arguments. + assert.strictEqual(arguments.length, 0); + if (callCount === binding.ARRAY_LENGTH) { + setImmediate(() => { + binding.StopThread(common.mustCall(() => { + resolve(); + }), false); + }); + } + }, false /* abort */, false /* launchSecondary */, binding.MAX_QUEUE_SIZE); +}) + +// Start the thread in blocking mode, and assert that all values are passed. +// Quit after it's done. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: binding.ARRAY_LENGTH +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode with an infinite queue, and assert that all +// values are passed. Quit after it's done. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + maxQueueSize: 0, + quitAfter: binding.ARRAY_LENGTH +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in non-blocking mode, and assert that all values are passed. +// Quit after it's done. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: binding.ARRAY_LENGTH +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: 1 +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode with an infinite queue, and assert that all +// values are passed. Quit early, but let the thread finish. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + maxQueueSize: 0, + quitAfter: 1 +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in non-blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: 1 +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. Launch a secondary thread to test the +// reference counter incrementing functionality. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + launchSecondary: true +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in non-blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. Launch a secondary thread to test the +// reference counter incrementing functionality. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + launchSecondary: true +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode, and assert that it could not finish. +// Quit early by aborting. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + abort: true +})) +.then((result) => assert.strictEqual(result.indexOf(0), -1)) + +// Start the thread in blocking mode with an infinite queue, and assert that it +// could not finish. Quit early by aborting. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1, + maxQueueSize: 0, + abort: true +})) +.then((result) => assert.strictEqual(result.indexOf(0), -1)) + +// Start the thread in non-blocking mode, and assert that it could not finish. +// Quit early and aborting. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + abort: true +})) +.then((result) => assert.strictEqual(result.indexOf(0), -1)) + +// Start a child process to test rapid teardown +.then(() => testUnref(binding.MAX_QUEUE_SIZE)) + +// Start a child process with an infinite queue to test rapid teardown +.then(() => testUnref(0)); |
