Primeiro commit do projeto Angular
This commit is contained in:
+284
@@ -0,0 +1,284 @@
|
||||
import { test } from 'node:test';
|
||||
import assert from 'node:assert/strict';
|
||||
import { EventEmitter } from 'node:events';
|
||||
import Piscina from '..';
|
||||
import { resolve } from 'path';
|
||||
|
||||
const TIMEOUT_MAX = 2 ** 31 - 1;
|
||||
|
||||
test('tasks can be aborted through AbortController while running', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
|
||||
});
|
||||
|
||||
const buf = new Int32Array(new SharedArrayBuffer(4));
|
||||
const abortController = new AbortController();
|
||||
assert.rejects(pool.run(buf, { signal: abortController.signal }),
|
||||
/The task has been aborted/);
|
||||
|
||||
Atomics.wait(buf, 0, 0);
|
||||
assert.strictEqual(Atomics.load(buf, 0), 1);
|
||||
|
||||
abortController.abort();
|
||||
});
|
||||
|
||||
test('tasks can be aborted through EventEmitter while running', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
|
||||
});
|
||||
|
||||
const buf = new Int32Array(new SharedArrayBuffer(4));
|
||||
const ee = new EventEmitter();
|
||||
assert.rejects(pool.run(buf, { signal: ee }), /The task has been aborted/);
|
||||
|
||||
Atomics.wait(buf, 0, 0);
|
||||
assert.strictEqual(Atomics.load(buf, 0), 1);
|
||||
|
||||
ee.emit('abort');
|
||||
});
|
||||
|
||||
test('tasks can be aborted through EventEmitter before running', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/wait-for-notify.js'),
|
||||
maxThreads: 1
|
||||
});
|
||||
|
||||
const bufs = [
|
||||
new Int32Array(new SharedArrayBuffer(4)),
|
||||
new Int32Array(new SharedArrayBuffer(4))
|
||||
];
|
||||
const ee = new EventEmitter();
|
||||
const task1 = pool.run(bufs[0]);
|
||||
const abortable = pool.run(bufs[1], { signal: ee });
|
||||
assert.strictEqual(pool.queueSize, 0); // Means it's running
|
||||
assert.rejects(abortable, /The task has been aborted/);
|
||||
|
||||
ee.emit('abort');
|
||||
|
||||
// Wake up the thread handling the first task.
|
||||
Atomics.store(bufs[0], 0, 1);
|
||||
Atomics.notify(bufs[0], 0, 1);
|
||||
await task1;
|
||||
});
|
||||
|
||||
test('abortable tasks will not share workers (abortable posted second)', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
|
||||
maxThreads: 1,
|
||||
concurrentTasksPerWorker: 2
|
||||
});
|
||||
|
||||
const bufs = [
|
||||
new Int32Array(new SharedArrayBuffer(4)),
|
||||
new Int32Array(new SharedArrayBuffer(4))
|
||||
];
|
||||
const task1 = pool.run(bufs[0]);
|
||||
const ee = new EventEmitter();
|
||||
assert.rejects(pool.run(bufs[1], { signal: ee }), /The task has been aborted/);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
ee.emit('abort');
|
||||
|
||||
// Wake up the thread handling the first task.
|
||||
Atomics.store(bufs[0], 0, 1);
|
||||
Atomics.notify(bufs[0], 0, 1);
|
||||
await task1;
|
||||
});
|
||||
|
||||
// TODO: move to testing balancer
|
||||
test('abortable tasks will not share workers (abortable posted first)', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
maxThreads: 1,
|
||||
concurrentTasksPerWorker: 2
|
||||
});
|
||||
|
||||
const ee = new EventEmitter();
|
||||
assert.rejects(pool.run('while(true);', { signal: ee }), /The task has been aborted/);
|
||||
const task2 = pool.run('42');
|
||||
assert.strictEqual(pool.queueSize, 1);
|
||||
|
||||
ee.emit('abort');
|
||||
|
||||
// Wake up the thread handling the second task.
|
||||
assert.strictEqual(await task2, 42);
|
||||
});
|
||||
|
||||
// TODO: move to testing balancer
|
||||
test('abortable tasks will not share workers (on worker available)', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/sleep.js'),
|
||||
maxThreads: 1,
|
||||
concurrentTasksPerWorker: 2
|
||||
});
|
||||
|
||||
// Task 1 will sleep 100 ms then complete,
|
||||
// Task 2 will sleep 300 ms then complete.
|
||||
// Abortable task 3 should still be in the queue
|
||||
// when Task 1 completes, but should not be selected
|
||||
// until after Task 2 completes because it is abortable.
|
||||
const ret = await Promise.all([
|
||||
pool.run({ time: 100, a: 1 }),
|
||||
pool.run({ time: 300, a: 2 }),
|
||||
pool.run({ time: 100, a: 3 }, { signal: new EventEmitter() })
|
||||
]);
|
||||
|
||||
assert.strictEqual(ret[0], 0);
|
||||
assert.strictEqual(ret[1], 1);
|
||||
assert.strictEqual(ret[2], 2);
|
||||
});
|
||||
|
||||
test('abortable tasks will not share workers (destroy workers)', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/sleep.js'),
|
||||
maxThreads: 1,
|
||||
concurrentTasksPerWorker: 2
|
||||
});
|
||||
|
||||
// Task 1 will sleep 0 ms then complete,
|
||||
// Task 2 will sleep indefinitely (TIMEOUT_MAX) then complete.
|
||||
// Abortable task 3 should still be in the queue
|
||||
// when Task 1 completes, but should not be selected
|
||||
// until after Task 2 completes because it is abortable.
|
||||
|
||||
pool.run({ time: 0, a: 1 }).then(() => {
|
||||
pool.destroy();
|
||||
});
|
||||
|
||||
assert.rejects(pool.run({ time: TIMEOUT_MAX, a: 2 }), /Terminating worker thread/);
|
||||
assert.rejects(pool.run({ time: TIMEOUT_MAX, a: 3 }, { signal: new EventEmitter() }),
|
||||
/Terminating worker thread/);
|
||||
});
|
||||
|
||||
test('aborted AbortSignal rejects task immediately', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/move.ts')
|
||||
});
|
||||
|
||||
const controller = new AbortController();
|
||||
// Abort the controller early
|
||||
controller.abort();
|
||||
assert.strictEqual(controller.signal.aborted, true);
|
||||
|
||||
// The data won't be moved because the task will abort immediately.
|
||||
const data = new Uint8Array(new SharedArrayBuffer(4));
|
||||
assert.rejects(pool.run(data, { signal: controller.signal, transferList: [data.buffer] }),
|
||||
/The task has been aborted/);
|
||||
|
||||
assert.strictEqual(data.length, 4);
|
||||
});
|
||||
|
||||
test('task with AbortSignal cleans up properly', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
const ee = new EventEmitter();
|
||||
|
||||
await pool.run('1+1', { signal: ee });
|
||||
|
||||
const { getEventListeners } = EventEmitter as any;
|
||||
if (typeof getEventListeners === 'function') {
|
||||
assert.strictEqual(getEventListeners(ee, 'abort').length, 0);
|
||||
}
|
||||
|
||||
const controller = new AbortController();
|
||||
|
||||
await pool.run('1+1', { signal: controller.signal });
|
||||
});
|
||||
|
||||
test('aborted AbortSignal rejects task immediately (with reason)', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/move.ts')
|
||||
});
|
||||
const customReason = new Error('custom reason');
|
||||
|
||||
const controller = new AbortController();
|
||||
controller.abort(customReason);
|
||||
assert.strictEqual(controller.signal.aborted, true);
|
||||
assert.strictEqual(controller.signal.reason, customReason);
|
||||
|
||||
// The data won't be moved because the task will abort immediately.
|
||||
const data = new Uint8Array(new SharedArrayBuffer(4));
|
||||
|
||||
try {
|
||||
await pool.run(data, { transferList: [data.buffer], signal: controller.signal });
|
||||
} catch (error) {
|
||||
assert.strictEqual(error.message, 'The task has been aborted');
|
||||
assert.strictEqual(error.cause, customReason);
|
||||
}
|
||||
|
||||
assert.strictEqual(data.length, 4);
|
||||
});
|
||||
|
||||
test('tasks can be aborted through AbortController while running', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
|
||||
});
|
||||
const reason = new Error('custom reason');
|
||||
|
||||
const buf = new Int32Array(new SharedArrayBuffer(4));
|
||||
const abortController = new AbortController();
|
||||
|
||||
try {
|
||||
const promise = pool.run(buf, { signal: abortController.signal });
|
||||
|
||||
Atomics.wait(buf, 0, 0);
|
||||
assert.strictEqual(Atomics.load(buf, 0), 1);
|
||||
|
||||
abortController.abort(reason);
|
||||
|
||||
await promise;
|
||||
} catch (error) {
|
||||
assert.strictEqual(error.message, 'The task has been aborted');
|
||||
assert.strictEqual(error.cause, reason);
|
||||
}
|
||||
});
|
||||
|
||||
test('aborted AbortSignal rejects task immediately (with reason)', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/move.ts')
|
||||
});
|
||||
const customReason = new Error('custom reason');
|
||||
|
||||
const controller = new AbortController();
|
||||
controller.abort(customReason);
|
||||
assert.strictEqual(controller.signal.aborted, true);
|
||||
assert.strictEqual(controller.signal.reason, customReason);
|
||||
|
||||
// The data won't be moved because the task will abort immediately.
|
||||
const data = new Uint8Array(new SharedArrayBuffer(4));
|
||||
|
||||
try {
|
||||
await pool.run(data, { transferList: [data.buffer], signal: controller.signal });
|
||||
} catch (error) {
|
||||
assert.strictEqual(error.message, 'The task has been aborted');
|
||||
assert.strictEqual(error.cause, customReason);
|
||||
}
|
||||
|
||||
assert.strictEqual(data.length, 4);
|
||||
});
|
||||
|
||||
test('tasks can be aborted through AbortController while running', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
|
||||
});
|
||||
const reason = new Error('custom reason');
|
||||
|
||||
const buf = new Int32Array(new SharedArrayBuffer(4));
|
||||
const abortController = new AbortController();
|
||||
|
||||
try {
|
||||
const promise = pool.run(buf, { signal: abortController.signal });
|
||||
|
||||
Atomics.wait(buf, 0, 0);
|
||||
assert.strictEqual(Atomics.load(buf, 0), 1);
|
||||
|
||||
abortController.abort(reason);
|
||||
|
||||
await promise;
|
||||
} catch (error) {
|
||||
assert.strictEqual(error.message, 'The task has been aborted');
|
||||
assert.strictEqual(error.cause, reason);
|
||||
}
|
||||
});
|
||||
+45
@@ -0,0 +1,45 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { test } from 'node:test';
|
||||
import { resolve } from 'node:path';
|
||||
import { createHook, executionAsyncId } from 'node:async_hooks';
|
||||
import Piscina from '..';
|
||||
|
||||
|
||||
test('postTask() calls the correct async hooks', async () => {
|
||||
let taskId;
|
||||
let initCalls = 0;
|
||||
let beforeCalls = 0;
|
||||
let afterCalls = 0;
|
||||
let resolveCalls = 0;
|
||||
|
||||
const hook = createHook({
|
||||
init (id, type) {
|
||||
if (type === 'Piscina.Task') {
|
||||
initCalls++;
|
||||
taskId = id;
|
||||
}
|
||||
},
|
||||
before (id) {
|
||||
if (id === taskId) beforeCalls++;
|
||||
},
|
||||
after (id) {
|
||||
if (id === taskId) afterCalls++;
|
||||
},
|
||||
promiseResolve () {
|
||||
if (executionAsyncId() === taskId) resolveCalls++;
|
||||
}
|
||||
});
|
||||
hook.enable();
|
||||
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
await pool.run('42');
|
||||
|
||||
hook.disable();
|
||||
assert.strictEqual(initCalls, 1);
|
||||
assert.strictEqual(beforeCalls, 1);
|
||||
assert.strictEqual(afterCalls, 1);
|
||||
assert.strictEqual(resolveCalls, 1);
|
||||
});
|
||||
+119
@@ -0,0 +1,119 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { test } from 'node:test';
|
||||
import { resolve } from 'node:path';
|
||||
import Piscina from '..';
|
||||
|
||||
test('coverage test for Atomics optimization (sync mode)', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/notify-then-sleep-or.js'),
|
||||
minThreads: 2,
|
||||
maxThreads: 2,
|
||||
concurrentTasksPerWorker: 2,
|
||||
atomics: 'sync'
|
||||
});
|
||||
|
||||
const tasks = [];
|
||||
let v: number;
|
||||
|
||||
// Post 4 tasks, and wait for all of them to be ready.
|
||||
const i32array = new Int32Array(new SharedArrayBuffer(4));
|
||||
for (let index = 0; index < 4; index++) {
|
||||
tasks.push(pool.run({ i32array, index }));
|
||||
}
|
||||
|
||||
// Wait for 2 tasks to enter 'wait' state.
|
||||
do {
|
||||
v = Atomics.load(i32array, 0);
|
||||
if (popcount8(v) >= 2) break;
|
||||
Atomics.wait(i32array, 0, v);
|
||||
} while (true);
|
||||
|
||||
// The check above could also be !== 2 but it's hard to get things right
|
||||
// sometimes and this gives us a nice assertion. Basically, at this point
|
||||
// exactly 2 tasks should be in Atomics.wait() state.
|
||||
assert.strictEqual(popcount8(v), 2);
|
||||
// Wake both tasks up as simultaneously as possible. The other 2 tasks should
|
||||
// then start executing.
|
||||
Atomics.store(i32array, 0, 0);
|
||||
Atomics.notify(i32array, 0, Infinity);
|
||||
|
||||
// Wait for the other 2 tasks to enter 'wait' state.
|
||||
do {
|
||||
v = Atomics.load(i32array, 0);
|
||||
if (popcount8(v) >= 2) break;
|
||||
Atomics.wait(i32array, 0, v);
|
||||
} while (true);
|
||||
|
||||
// At this point, the first two tasks are definitely finished and have
|
||||
// definitely posted results back to the main thread, and the main thread
|
||||
// has definitely not received them yet, meaning that the Atomics check will
|
||||
// be used. Making sure that that works is the point of this test.
|
||||
|
||||
// Wake up the remaining 2 tasks in order to make sure that the test finishes.
|
||||
// Do the same consistency check beforehand as above.
|
||||
assert.strictEqual(popcount8(v), 2);
|
||||
Atomics.store(i32array, 0, 0);
|
||||
Atomics.notify(i32array, 0, Infinity);
|
||||
|
||||
await Promise.all(tasks);
|
||||
});
|
||||
|
||||
// Inefficient but straightforward 8-bit popcount
|
||||
function popcount8 (v : number) : number {
|
||||
v &= 0xff;
|
||||
if (v & 0b11110000) return popcount8(v >>> 4) + popcount8(v & 0xb00001111);
|
||||
if (v & 0b00001100) return popcount8(v >>> 2) + popcount8(v & 0xb00000011);
|
||||
if (v & 0b00000010) return popcount8(v >>> 1) + popcount8(v & 0xb00000001);
|
||||
return v;
|
||||
}
|
||||
|
||||
test('avoids unbounded recursion', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/simple-isworkerthread.ts'),
|
||||
minThreads: 2,
|
||||
maxThreads: 2,
|
||||
atomics: 'sync'
|
||||
});
|
||||
|
||||
const tasks = [];
|
||||
for (let i = 1; i <= 10000; i++) {
|
||||
tasks.push(pool.run(null));
|
||||
}
|
||||
|
||||
await Promise.all(tasks);
|
||||
});
|
||||
|
||||
test('enable async mode', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval-params.js'),
|
||||
minThreads: 1,
|
||||
maxThreads: 1,
|
||||
atomics: 'async'
|
||||
});
|
||||
|
||||
const bufs = [
|
||||
new Int32Array(new SharedArrayBuffer(4)),
|
||||
new Int32Array(new SharedArrayBuffer(4)),
|
||||
new Int32Array(new SharedArrayBuffer(4))
|
||||
];
|
||||
|
||||
const script = `
|
||||
setTimeout(() => { Atomics.add(input.shared[0], 0, 1); Atomics.notify(input.shared[0], 0, Infinity); }, 100);
|
||||
setTimeout(() => { Atomics.add(input.shared[1], 0, 1); Atomics.notify(input.shared[1], 0, Infinity); }, 300);
|
||||
setTimeout(() => { Atomics.add(input.shared[2], 0, 1); Atomics.notify(input.shared[2], 0, Infinity); }, 500);
|
||||
|
||||
true
|
||||
`;
|
||||
|
||||
const promise = pool.run({
|
||||
code: script,
|
||||
shared: bufs
|
||||
});
|
||||
|
||||
const atResult1 = Atomics.wait(bufs[0], 0, 0);
|
||||
const atResult2 = Atomics.wait(bufs[1], 0, 0);
|
||||
const atResult3 = Atomics.wait(bufs[2], 0, 0);
|
||||
|
||||
assert.deepStrictEqual([atResult1, atResult2, atResult3], ['ok', 'ok', 'ok']);
|
||||
assert.strictEqual(await promise, true);
|
||||
});
|
||||
+25
@@ -0,0 +1,25 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { resolve } from 'node:path';
|
||||
import { spawn } from 'node:child_process';
|
||||
import { test } from 'node:test';
|
||||
import concat from 'concat-stream';
|
||||
|
||||
test('console.log() calls are not blocked by Atomics.wait() (sync mode)', async () => {
|
||||
const proc = spawn(process.execPath, [
|
||||
...process.execArgv, resolve(__dirname, 'fixtures/console-log.ts')
|
||||
], {
|
||||
stdio: ['inherit', 'pipe', 'pipe'],
|
||||
env: {
|
||||
PISCINA_ENABLE_ASYNC_ATOMICS: '0'
|
||||
}
|
||||
});
|
||||
|
||||
const dataStdout = await new Promise((resolve) => {
|
||||
proc.stdout.setEncoding('utf8').pipe(concat(resolve));
|
||||
});
|
||||
const dataStderr = await new Promise((resolve) => {
|
||||
proc.stderr.setEncoding('utf8').pipe(concat(resolve));
|
||||
});
|
||||
assert.strictEqual(dataStdout, 'A\n');
|
||||
assert.strictEqual(dataStderr, 'B\n');
|
||||
});
|
||||
+211
@@ -0,0 +1,211 @@
|
||||
import assert from 'node:assert';
|
||||
import { test } from 'node:test';
|
||||
import { resolve } from 'node:path';
|
||||
import { kQueueOptions } from '../dist/symbols';
|
||||
import { Piscina, FixedQueue, PiscinaTask as Task } from '..';
|
||||
|
||||
// @ts-expect-error - it misses several properties, but it's enough for the test
|
||||
class QueueTask implements Task {
|
||||
get [kQueueOptions] () {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
test('queue length', () => {
|
||||
const queue = new FixedQueue();
|
||||
|
||||
assert.strictEqual(queue.size, 0);
|
||||
|
||||
queue.push(new QueueTask());
|
||||
|
||||
assert.strictEqual(queue.size, 1);
|
||||
|
||||
queue.shift();
|
||||
|
||||
assert.strictEqual(queue.size, 0);
|
||||
});
|
||||
|
||||
test('queue length should not become negative', () => {
|
||||
const queue = new FixedQueue();
|
||||
|
||||
assert.strictEqual(queue.size, 0);
|
||||
|
||||
queue.shift();
|
||||
|
||||
assert.strictEqual(queue.size, 0);
|
||||
});
|
||||
|
||||
test('queue remove', () => {
|
||||
const queue = new FixedQueue();
|
||||
|
||||
const task = new QueueTask();
|
||||
|
||||
assert.strictEqual(queue.size, 0, 'should be empty on start');
|
||||
|
||||
queue.push(task);
|
||||
|
||||
assert.strictEqual(queue.size, 1, 'should contain single task after push');
|
||||
|
||||
queue.remove(task);
|
||||
|
||||
assert.strictEqual(queue.size, 0, 'should be empty after task removal');
|
||||
});
|
||||
|
||||
test('remove not queued task should not lead to errors', () => {
|
||||
const queue = new FixedQueue();
|
||||
|
||||
const task = new QueueTask();
|
||||
|
||||
assert.strictEqual(queue.size, 0, 'should be empty on start');
|
||||
|
||||
queue.remove(task);
|
||||
|
||||
assert.strictEqual(queue.size, 0, 'should be empty after task removal');
|
||||
});
|
||||
|
||||
test('removing elements from intermediate CircularBuffer should not lead to issues', () => {
|
||||
/*
|
||||
The test intends to check following scenario:
|
||||
1) We fill the queue with 3 full circular buffers amount of items.
|
||||
2) Empty the middle circular buffer with remove().
|
||||
3) This should lead to the removal of the middle buffer from the queue:
|
||||
- Before emptying: tail buffer -> middle buffer -> head buffer.
|
||||
- After emptying: tail buffer -> head buffer.
|
||||
*/
|
||||
|
||||
const queue = new FixedQueue();
|
||||
|
||||
// size of single circular buffer
|
||||
const batchSize = 2047;
|
||||
|
||||
const firstBatch = Array.from({ length: batchSize }, () => new QueueTask());
|
||||
const secondBatch = Array.from({ length: batchSize }, () => new QueueTask());
|
||||
const thirdBatch = Array.from({ length: batchSize }, () => new QueueTask());
|
||||
|
||||
const tasks = firstBatch.concat(secondBatch, thirdBatch);
|
||||
|
||||
for (const task of tasks) {
|
||||
queue.push(task);
|
||||
}
|
||||
assert.strictEqual(queue.size, tasks.length, `should contain ${batchSize} * 3 items`);
|
||||
|
||||
let size = queue.size;
|
||||
for (const task of secondBatch) {
|
||||
queue.remove(task);
|
||||
assert.strictEqual(queue.size, --size, `should contain ${size} items`);
|
||||
}
|
||||
|
||||
const expected = firstBatch.concat(thirdBatch);
|
||||
const actual = [];
|
||||
while (!queue.isEmpty()) {
|
||||
const task = queue.shift();
|
||||
actual.push(task);
|
||||
}
|
||||
assert.deepEqual(actual, expected);
|
||||
});
|
||||
|
||||
test('removing elements from first CircularBuffer should not lead to issues', () => {
|
||||
/*
|
||||
The test intends to check following scenario:
|
||||
1) We fill the queue with 3 full circular buffers amount of items.
|
||||
2) Empty the first circular buffer with remove().
|
||||
3) This should lead to the removal of the tail buffer from the queue:
|
||||
- Before emptying: tail buffer -> middle buffer -> head buffer.
|
||||
- After emptying: tail buffer (previously middle) -> head buffer.
|
||||
*/
|
||||
const queue = new FixedQueue();
|
||||
|
||||
// size of single circular buffer
|
||||
const batchSize = 2047;
|
||||
|
||||
const firstBatch = Array.from({ length: batchSize }, () => new QueueTask());
|
||||
const secondBatch = Array.from({ length: batchSize }, () => new QueueTask());
|
||||
const thirdBatch = Array.from({ length: batchSize }, () => new QueueTask());
|
||||
|
||||
const tasks = firstBatch.concat(secondBatch, thirdBatch);
|
||||
|
||||
for (const task of tasks) {
|
||||
queue.push(task);
|
||||
}
|
||||
assert.strictEqual(queue.size, tasks.length, `should contain ${batchSize} * 3 items`);
|
||||
|
||||
let size = queue.size;
|
||||
for (const task of firstBatch) {
|
||||
queue.remove(task);
|
||||
assert.strictEqual(queue.size, --size, `should contain ${size} items`);
|
||||
}
|
||||
|
||||
const expected = secondBatch.concat(thirdBatch);
|
||||
const actual = [];
|
||||
while (!queue.isEmpty()) {
|
||||
const task = queue.shift();
|
||||
actual.push(task);
|
||||
}
|
||||
assert.deepEqual(actual, expected);
|
||||
});
|
||||
|
||||
test('removing elements from last CircularBuffer should not lead to issues', async () => {
|
||||
/*
|
||||
The test intends to check following scenario:
|
||||
1) We fill the queue with 3 full circular buffers amount of items.
|
||||
2) Empty the last circular buffer with remove().
|
||||
3) This should lead to the removal of the head buffer from the queue:
|
||||
- Before emptying: tail buffer -> middle buffer -> head buffer.
|
||||
- After emptying: tail buffer -> head buffer (previously middle).
|
||||
*/
|
||||
const queue = new FixedQueue();
|
||||
|
||||
// size of single circular buffer
|
||||
const batchSize = 2047;
|
||||
|
||||
const firstBatch = Array.from({ length: batchSize }, () => new QueueTask());
|
||||
const secondBatch = Array.from({ length: batchSize }, () => new QueueTask());
|
||||
const thirdBatch = Array.from({ length: batchSize }, () => new QueueTask());
|
||||
|
||||
const tasks = firstBatch.concat(secondBatch, thirdBatch);
|
||||
|
||||
for (const task of tasks) {
|
||||
queue.push(task);
|
||||
}
|
||||
assert.strictEqual(queue.size, tasks.length, `should contain ${batchSize} * 3 items`);
|
||||
|
||||
let size = queue.size;
|
||||
for (const task of thirdBatch) {
|
||||
queue.remove(task);
|
||||
assert.strictEqual(queue.size, --size, `should contain ${size} items`);
|
||||
}
|
||||
|
||||
const expected = firstBatch.concat(secondBatch);
|
||||
const actual = [];
|
||||
while (!queue.isEmpty()) {
|
||||
const task = queue.shift();
|
||||
actual.push(task);
|
||||
}
|
||||
assert.deepEqual(actual, expected);
|
||||
});
|
||||
|
||||
test('simple integraion with Piscina', async () => {
|
||||
const queue = new FixedQueue();
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/simple-isworkerthread-named-import.ts'),
|
||||
taskQueue: queue
|
||||
});
|
||||
|
||||
const result = await pool.run(null);
|
||||
assert.strictEqual(result, 'done');
|
||||
});
|
||||
|
||||
test('concurrent calls with Piscina', async () => {
|
||||
const queue = new FixedQueue();
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval-async.js'),
|
||||
taskQueue: queue
|
||||
});
|
||||
|
||||
const tasks = ['1+1', '2+2', '3+3'];
|
||||
const results = await Promise.all(tasks.map((task) => pool.run(task)));
|
||||
// eslint-disable-next-line
|
||||
const expected = tasks.map(eval);
|
||||
|
||||
assert.deepEqual(results, expected);
|
||||
});
|
||||
+12
@@ -0,0 +1,12 @@
|
||||
import { resolve } from 'node:path';
|
||||
import Piscina from '../..';
|
||||
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'eval.js'),
|
||||
maxThreads: 1,
|
||||
env: {
|
||||
PISCINA_ENABLE_ASYNC_ATOMICS: process.env.PISCINA_ENABLE_ASYNC_ATOMICS
|
||||
}
|
||||
});
|
||||
|
||||
pool.run('console.log("A"); console.error("B");');
|
||||
+13
@@ -0,0 +1,13 @@
|
||||
import { promisify } from 'node:util';
|
||||
|
||||
const sleep = promisify(setTimeout);
|
||||
|
||||
// eslint-disable-next-line no-eval
|
||||
function handler (code) { return eval(code); }
|
||||
|
||||
async function load () {
|
||||
await sleep(5);
|
||||
return handler;
|
||||
}
|
||||
|
||||
export default load();
|
||||
+2
@@ -0,0 +1,2 @@
|
||||
// eslint-disable-next-line no-eval
|
||||
export default function (code) { return eval(code); };
|
||||
+15
@@ -0,0 +1,15 @@
|
||||
'use strict';
|
||||
|
||||
const { promisify } = require('node:util');
|
||||
|
||||
const sleep = promisify(setTimeout);
|
||||
|
||||
// eslint-disable-next-line no-eval
|
||||
function handler (code) { return eval(code); }
|
||||
|
||||
async function load () {
|
||||
await sleep(5);
|
||||
return handler;
|
||||
}
|
||||
|
||||
module.exports = load();
|
||||
+2
@@ -0,0 +1,2 @@
|
||||
// eslint-disable-next-line no-eval
|
||||
module.exports = function (input) { return eval(input.code); };
|
||||
+2
@@ -0,0 +1,2 @@
|
||||
// eslint-disable-next-line no-eval
|
||||
module.exports = function (code) { return eval(code); };
|
||||
+10
@@ -0,0 +1,10 @@
|
||||
import assert from 'node:assert';
|
||||
import { types } from 'node:util';
|
||||
import Piscina from '../..';
|
||||
|
||||
export default function (moved) {
|
||||
if (moved !== undefined) {
|
||||
assert(types.isAnyArrayBuffer(moved));
|
||||
}
|
||||
return Piscina.move(new ArrayBuffer(10));
|
||||
}
|
||||
+10
@@ -0,0 +1,10 @@
|
||||
'use strict';
|
||||
|
||||
function a () { return 'a'; }
|
||||
|
||||
function b () { return 'b'; }
|
||||
|
||||
a.a = a;
|
||||
a.b = b;
|
||||
|
||||
module.exports = a;
|
||||
+10
@@ -0,0 +1,10 @@
|
||||
// Set the index-th bith in i32array[0], then wait for it to be un-set again.
|
||||
module.exports = function ({ i32array, index }) {
|
||||
Atomics.or(i32array, 0, 1 << index);
|
||||
Atomics.notify(i32array, 0, Infinity);
|
||||
do {
|
||||
const v = Atomics.load(i32array, 0);
|
||||
if (!(v & (1 << index))) break;
|
||||
Atomics.wait(i32array, 0, v);
|
||||
} while (true);
|
||||
};
|
||||
+10
@@ -0,0 +1,10 @@
|
||||
// Set the index-th bith in i32array[0], then wait for it to be un-set again.
|
||||
module.exports = function ({ i32array, index }) {
|
||||
Atomics.or(i32array, 0, 1 << index);
|
||||
Atomics.notify(i32array, 0, Infinity);
|
||||
do {
|
||||
const v = Atomics.load(i32array, 0);
|
||||
if (!(v & (1 << index))) break;
|
||||
Atomics.wait(i32array, 0, v);
|
||||
} while (true);
|
||||
};
|
||||
+5
@@ -0,0 +1,5 @@
|
||||
module.exports = function (i32array) {
|
||||
Atomics.store(i32array, 0, 1);
|
||||
Atomics.notify(i32array, 0, Infinity);
|
||||
Atomics.wait(i32array, 0, 1);
|
||||
};
|
||||
+8
@@ -0,0 +1,8 @@
|
||||
'use strict';
|
||||
|
||||
module.exports = () => {
|
||||
const array = [];
|
||||
while (true) {
|
||||
array.push([array]);
|
||||
}
|
||||
};
|
||||
+18
@@ -0,0 +1,18 @@
|
||||
'use strict';
|
||||
|
||||
const Piscina = require('../../dist');
|
||||
|
||||
let time;
|
||||
module.exports = {
|
||||
send: async () => {
|
||||
const data = new ArrayBuffer(128);
|
||||
try {
|
||||
return Piscina.move(data);
|
||||
} finally {
|
||||
setTimeout(() => { time = data.byteLength; }, 5);
|
||||
}
|
||||
},
|
||||
get: () => {
|
||||
return time;
|
||||
}
|
||||
};
|
||||
+38
@@ -0,0 +1,38 @@
|
||||
'use strict';
|
||||
|
||||
const Piscina = require('../../dist');
|
||||
|
||||
class Shared {
|
||||
constructor (data) {
|
||||
this.name = 'shared';
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
get [Piscina.transferableSymbol] () {
|
||||
return [this.data];
|
||||
}
|
||||
|
||||
get [Piscina.valueSymbol] () {
|
||||
return { name: this.name, data: this.data };
|
||||
}
|
||||
|
||||
make () {
|
||||
return Piscina.move(this);
|
||||
}
|
||||
}
|
||||
|
||||
let time;
|
||||
module.exports = {
|
||||
send: async () => {
|
||||
const data = new ArrayBuffer(128);
|
||||
const shared = new Shared(data);
|
||||
try {
|
||||
return shared.make();
|
||||
} finally {
|
||||
setTimeout(() => { time = data.byteLength; }, 5);
|
||||
}
|
||||
},
|
||||
get: () => {
|
||||
return time;
|
||||
}
|
||||
};
|
||||
+6
@@ -0,0 +1,6 @@
|
||||
import assert from 'node:assert';
|
||||
import { isWorkerThread } from '../..';
|
||||
|
||||
assert.strictEqual(isWorkerThread, true);
|
||||
|
||||
export default function () { return 'done'; }
|
||||
+6
@@ -0,0 +1,6 @@
|
||||
import assert from 'node:assert';
|
||||
import Piscina from '../..';
|
||||
|
||||
assert.strictEqual(Piscina.isWorkerThread, true);
|
||||
|
||||
export default function () { return 'done'; }
|
||||
+6
@@ -0,0 +1,6 @@
|
||||
import assert from 'node:assert';
|
||||
import { workerData } from '../..';
|
||||
|
||||
assert.strictEqual(workerData, 'ABC');
|
||||
|
||||
export default function () { return 'done'; }
|
||||
+6
@@ -0,0 +1,6 @@
|
||||
import assert from 'node:assert';
|
||||
import Piscina from '../..';
|
||||
|
||||
assert.strictEqual(Piscina.workerData, 'ABC');
|
||||
|
||||
export default function () { return 'done'; }
|
||||
+13
@@ -0,0 +1,13 @@
|
||||
'use strict';
|
||||
|
||||
const { promisify } = require('node:util');
|
||||
|
||||
const sleep = promisify(setTimeout);
|
||||
|
||||
const buf = new Uint32Array(new SharedArrayBuffer(4));
|
||||
|
||||
module.exports = async ({ time = 5, a }) => {
|
||||
await sleep(time);
|
||||
const ret = Atomics.exchange(buf, 0, a);
|
||||
return ret;
|
||||
};
|
||||
+7
@@ -0,0 +1,7 @@
|
||||
// worker.js
|
||||
const vm = require('node:vm');
|
||||
|
||||
module.exports = ({ payload, context }) => {
|
||||
const script = new vm.Script(payload);
|
||||
script.runInNewContext(context);
|
||||
};
|
||||
+5
@@ -0,0 +1,5 @@
|
||||
module.exports = function (i32array) {
|
||||
Atomics.wait(i32array, 0, 0);
|
||||
Atomics.store(i32array, 0, -1);
|
||||
Atomics.notify(i32array, 0, Infinity);
|
||||
};
|
||||
+5
@@ -0,0 +1,5 @@
|
||||
module.exports = function (i32array) {
|
||||
Atomics.wait(i32array, 0, 0);
|
||||
Atomics.store(i32array, 0, -1);
|
||||
Atomics.notify(i32array, 0, Infinity);
|
||||
};
|
||||
+11
@@ -0,0 +1,11 @@
|
||||
import { threadId } from 'worker_threads';
|
||||
|
||||
module.exports = async function ([i32array, n]) {
|
||||
Atomics.add(i32array, 0, 1);
|
||||
Atomics.notify(i32array, 0, Infinity);
|
||||
let lastSeenValue;
|
||||
while ((lastSeenValue = Atomics.load(i32array, 0)) < n) {
|
||||
Atomics.wait(i32array, 0, lastSeenValue);
|
||||
}
|
||||
return threadId;
|
||||
};
|
||||
+46
@@ -0,0 +1,46 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { resolve } from 'node:path';
|
||||
import { test } from 'node:test';
|
||||
import Piscina from '../dist';
|
||||
|
||||
test('Piscina<T , R> works', async () => {
|
||||
const worker = new Piscina<string, number>({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
const result: number = await worker.run('Promise.resolve(42)');
|
||||
|
||||
assert.strictEqual(result, 42);
|
||||
});
|
||||
|
||||
test('Piscina with no generic works', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
const result = await worker.run('Promise.resolve("Hello, world!")');
|
||||
|
||||
assert.strictEqual(result, 'Hello, world!');
|
||||
});
|
||||
|
||||
test('Piscina<T, R> typescript complains when invalid Task is supplied as wrong type', async () => {
|
||||
const worker = new Piscina<string, number>({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
// @ts-expect-error complains due to invalid Task being number when expecting string
|
||||
const result = await worker.run(42);
|
||||
|
||||
assert.strictEqual(result, 42);
|
||||
});
|
||||
|
||||
test('Piscina<T, R> typescript complains when assigning Result to wrong type', async () => {
|
||||
const worker = new Piscina<string, number>({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
// @ts-expect-error complains due to expecting a number but being assigned to a string
|
||||
const result: string = await worker.run('Promise.resolve(42)');
|
||||
|
||||
assert.strictEqual(result, 42);
|
||||
});
|
||||
+215
@@ -0,0 +1,215 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { resolve } from 'node:path';
|
||||
import { test } from 'node:test';
|
||||
import type { TestContext } from 'node:test';
|
||||
import Piscina from '..';
|
||||
import { PiscinaWorker } from '../dist/worker_pool';
|
||||
|
||||
test('pool will maintain run and wait time histograms by default', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
const tasks = [];
|
||||
for (let n = 0; n < 5; n++) {
|
||||
tasks.push(pool.run('42'));
|
||||
}
|
||||
await Promise.all(tasks);
|
||||
|
||||
const histogram = pool.histogram;
|
||||
const waitTime = histogram.waitTime;
|
||||
assert.ok(waitTime);
|
||||
assert.strictEqual(typeof waitTime.average, 'number');
|
||||
assert.strictEqual(typeof waitTime.mean, 'number');
|
||||
assert.strictEqual(typeof waitTime.stddev, 'number');
|
||||
assert.strictEqual(typeof waitTime.min, 'number');
|
||||
assert.strictEqual(typeof waitTime.max, 'number');
|
||||
|
||||
const runTime = histogram.runTime;
|
||||
assert.ok(runTime);
|
||||
assert.strictEqual(typeof runTime.average, 'number');
|
||||
assert.strictEqual(typeof runTime.mean, 'number');
|
||||
assert.strictEqual(typeof runTime.stddev, 'number');
|
||||
assert.strictEqual(typeof runTime.min, 'number');
|
||||
assert.strictEqual(typeof runTime.max, 'number');
|
||||
assert.strictEqual(typeof histogram.resetRunTime, 'function');
|
||||
assert.strictEqual(typeof histogram.resetWaitTime, 'function');
|
||||
});
|
||||
|
||||
test('pool will maintain reset histograms upon call', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
const tasks = [];
|
||||
for (let n = 0; n < 5; n++) {
|
||||
tasks.push(pool.run('42'));
|
||||
}
|
||||
await Promise.all(tasks);
|
||||
|
||||
const histogram = pool.histogram;
|
||||
let waitTime = histogram.waitTime;
|
||||
assert.ok(waitTime);
|
||||
assert.strictEqual(typeof waitTime.average, 'number');
|
||||
assert.strictEqual(typeof waitTime.mean, 'number');
|
||||
assert.strictEqual(typeof waitTime.stddev, 'number');
|
||||
assert.ok(waitTime.min > 0);
|
||||
assert.ok(waitTime.max > 0);
|
||||
|
||||
let runTime = histogram.runTime;
|
||||
assert.ok(runTime);
|
||||
assert.strictEqual(typeof runTime.average, 'number');
|
||||
assert.strictEqual(typeof runTime.mean, 'number');
|
||||
assert.strictEqual(typeof runTime.stddev, 'number');
|
||||
assert.ok(runTime.min > 0);
|
||||
assert.ok(runTime.max > 0);
|
||||
|
||||
histogram.resetRunTime();
|
||||
runTime = histogram.runTime;
|
||||
assert.ok(Number.isNaN(runTime.average));
|
||||
assert.ok(Number.isNaN(runTime.mean));
|
||||
assert.ok(Number.isNaN(runTime.stddev));
|
||||
assert.strictEqual(runTime.max, 0);
|
||||
|
||||
histogram.resetWaitTime();
|
||||
waitTime = histogram.waitTime;
|
||||
assert.ok(Number.isNaN(waitTime.average));
|
||||
assert.ok(Number.isNaN(waitTime.mean));
|
||||
assert.ok(Number.isNaN(waitTime.stddev));
|
||||
assert.strictEqual(waitTime.max, 0);
|
||||
});
|
||||
|
||||
test('pool will maintain run and wait time histograms when recordTiming is true', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
recordTiming: true
|
||||
});
|
||||
|
||||
const tasks = [];
|
||||
for (let n = 0; n < 5; n++) {
|
||||
tasks.push(pool.run('42'));
|
||||
}
|
||||
await Promise.all(tasks);
|
||||
|
||||
const waitTime = pool.histogram.waitTime;
|
||||
assert.ok(waitTime);
|
||||
|
||||
const runTime = pool.histogram.runTime;
|
||||
assert.ok(runTime);
|
||||
});
|
||||
|
||||
test('pool does not maintain run and wait time histograms when recordTiming is false', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
recordTiming: false
|
||||
});
|
||||
|
||||
const tasks = [];
|
||||
for (let n = 0; n < 5; n++) {
|
||||
tasks.push(pool.run('42'));
|
||||
}
|
||||
await Promise.all(tasks);
|
||||
|
||||
assert.ok(!pool.histogram.waitTime);
|
||||
assert.ok(!pool.histogram.runTime);
|
||||
});
|
||||
|
||||
test('workers has histogram', async (t: TestContext) => {
|
||||
let index = 0;
|
||||
let list: PiscinaWorker[];
|
||||
// Its expected to have one task get balanced twice due to the load balancer distribution
|
||||
// first task enters, its distributed; second is enqueued, once first is done, second is distributed and normalizes
|
||||
t.plan(4);
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
maxThreads: 1,
|
||||
concurrentTasksPerWorker: 1,
|
||||
workerHistogram: true,
|
||||
loadBalancer (_task, workers) {
|
||||
// Verify distribution to properly test this feature
|
||||
const candidate = workers[index++ % workers.length];
|
||||
|
||||
// We assign it everytime is called to check the histogram
|
||||
// and that the list remains the same
|
||||
list = workers;
|
||||
|
||||
if (candidate.currentUsage !== 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return candidate;
|
||||
}
|
||||
});
|
||||
|
||||
const tasks = [];
|
||||
for (let n = 0; n < 5; n++) {
|
||||
tasks.push(pool.run('new Promise(resolve => setTimeout(resolve, 500))'));
|
||||
}
|
||||
await Promise.all(tasks);
|
||||
const histogram = list[0].histogram;
|
||||
t.assert.ok(typeof histogram?.average, 'number');
|
||||
t.assert.ok(typeof histogram?.max, 'number');
|
||||
t.assert.ok(typeof histogram?.mean, 'number');
|
||||
t.assert.ok(typeof histogram?.min, 'number');
|
||||
});
|
||||
|
||||
test('workers does not have histogram if disabled', async (t: TestContext) => {
|
||||
let index = 0;
|
||||
// After each task the balancer is called to distribute the next task
|
||||
// The first task is distributed, the second is enqueued, once the first is done, the second is distributed and normalizes
|
||||
t.plan(5 * 2);
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
maxThreads: 1,
|
||||
concurrentTasksPerWorker: 1,
|
||||
workerHistogram: false,
|
||||
loadBalancer (_task, workers) {
|
||||
// Verify distribution to properly test this feature
|
||||
const candidate = workers[index++ % workers.length];
|
||||
const histogram = candidate.histogram;
|
||||
t.assert.ok(!histogram);
|
||||
|
||||
if (candidate.currentUsage !== 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return candidate;
|
||||
}
|
||||
});
|
||||
|
||||
const tasks = [];
|
||||
for (let n = 0; n < 5; n++) {
|
||||
tasks.push(pool.run('new Promise(resolve => setTimeout(resolve, 5))'));
|
||||
}
|
||||
await Promise.all(tasks);
|
||||
});
|
||||
|
||||
test('opts.workerHistogram should be a boolean value', (t: TestContext) => {
|
||||
let index = 0;
|
||||
t.plan(1);
|
||||
t.assert.throws(() => {
|
||||
// eslint-disable-next-line no-new
|
||||
new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
maxThreads: 1,
|
||||
concurrentTasksPerWorker: 1,
|
||||
// @ts-expect-error
|
||||
workerHistogram: 1,
|
||||
loadBalancer (_task, workers) {
|
||||
// Verify distribution to properly test this feature
|
||||
const candidate = workers[index++ % workers.length];
|
||||
const histogram = candidate.histogram;
|
||||
|
||||
t.assert.ok(!histogram);
|
||||
|
||||
if (candidate.currentUsage !== 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return candidate;
|
||||
}
|
||||
});
|
||||
}, {
|
||||
message: 'options.workerHistogram must be a boolean'
|
||||
});
|
||||
});
|
||||
+82
@@ -0,0 +1,82 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { test } from 'node:test';
|
||||
import { resolve } from 'node:path';
|
||||
import { promisify } from 'node:util';
|
||||
import Piscina from '..';
|
||||
|
||||
const delay = promisify(setTimeout);
|
||||
|
||||
test('idle timeout will let go of threads early', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/wait-for-others.ts'),
|
||||
idleTimeout: 50, // 50ms
|
||||
minThreads: 1,
|
||||
maxThreads: 2
|
||||
});
|
||||
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
const buffer = new Int32Array(new SharedArrayBuffer(4));
|
||||
|
||||
const firstTasks = [
|
||||
pool.run([buffer, 2]),
|
||||
pool.run([buffer, 2])
|
||||
];
|
||||
assert.strictEqual(pool.threads.length, 2);
|
||||
|
||||
const earlyThreadIds = await Promise.all(firstTasks);
|
||||
assert.strictEqual(pool.threads.length, 2);
|
||||
|
||||
await delay(100);
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
|
||||
const secondTasks = [
|
||||
pool.run([buffer, 4]),
|
||||
pool.run([buffer, 4])
|
||||
];
|
||||
assert.strictEqual(pool.threads.length, 2);
|
||||
|
||||
const lateThreadIds = await Promise.all(secondTasks);
|
||||
|
||||
// One thread should have been idle in between and exited, one should have
|
||||
// been reused.
|
||||
assert.strictEqual(earlyThreadIds.length, 2);
|
||||
assert.strictEqual(lateThreadIds.length, 2);
|
||||
assert.strictEqual(new Set([...earlyThreadIds, ...lateThreadIds]).size, 3);
|
||||
});
|
||||
|
||||
test('idle timeout will not let go of threads if Infinity is used as the value', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/wait-for-others.ts'),
|
||||
idleTimeout: Infinity,
|
||||
minThreads: 1,
|
||||
maxThreads: 2
|
||||
});
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
const buffer = new Int32Array(new SharedArrayBuffer(4));
|
||||
|
||||
const firstTasks = [
|
||||
pool.run([buffer, 2]),
|
||||
pool.run([buffer, 2])
|
||||
];
|
||||
assert.strictEqual(pool.threads.length, 2);
|
||||
|
||||
const earlyThreadIds = await Promise.all(firstTasks);
|
||||
assert.strictEqual(pool.threads.length, 2);
|
||||
|
||||
await delay(1000);
|
||||
assert.strictEqual(pool.threads.length, 2);
|
||||
|
||||
const secondTasks = [
|
||||
pool.run([buffer, 4]),
|
||||
pool.run([buffer, 4]),
|
||||
];
|
||||
assert.strictEqual(pool.threads.length, 2);
|
||||
|
||||
|
||||
|
||||
const lateThreadIds = await Promise.all(secondTasks);
|
||||
assert.deepStrictEqual(earlyThreadIds, lateThreadIds);
|
||||
|
||||
await Promise.all([pool.run([buffer, 6]), pool.run([buffer, 6]), pool.run([buffer, 6])]);
|
||||
assert.strictEqual(pool.threads.length, 2);
|
||||
});
|
||||
+17
@@ -0,0 +1,17 @@
|
||||
import assert from 'node:assert';
|
||||
import { test } from 'node:test';
|
||||
import { resolve } from 'node:path';
|
||||
import Piscina from '..';
|
||||
|
||||
test('pool will maintain run and wait time histograms', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/vm.js')
|
||||
});
|
||||
|
||||
try {
|
||||
await pool.run({ payload: 'throw new Error("foo")' });
|
||||
assert.fail('Expected an error');
|
||||
} catch (error) {
|
||||
assert.strictEqual(error.message, 'foo');
|
||||
}
|
||||
});
|
||||
+23
@@ -0,0 +1,23 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { test } from 'node:test';
|
||||
|
||||
|
||||
const importESM : (specifier : string) => Promise<any> =
|
||||
// eslint-disable-next-line no-eval
|
||||
eval('(specifier) => import(specifier)');
|
||||
|
||||
test('Piscina is default export', {}, async () => {
|
||||
assert.strictEqual((await importESM('piscina')).default, require('../'));
|
||||
});
|
||||
|
||||
test('Exports match own property names', {}, async () => {
|
||||
// Check that version, workerData, etc. are re-exported.
|
||||
const exported = new Set(Object.getOwnPropertyNames(await importESM('piscina')));
|
||||
const required = new Set(Object.getOwnPropertyNames(require('../')));
|
||||
|
||||
// Remove constructor properties + default export.
|
||||
for (const k of ['prototype', 'length', 'name']) required.delete(k);
|
||||
exported.delete('default');
|
||||
|
||||
assert.deepStrictEqual(exported, required);
|
||||
});
|
||||
+21
@@ -0,0 +1,21 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { test } from 'node:test';
|
||||
import { resolve } from 'node:path';
|
||||
import { once } from 'node:events';
|
||||
import Piscina from '..';
|
||||
|
||||
test('Pool receive message from workers', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
const messagePromise = once(pool, 'message');
|
||||
|
||||
const taskResult = pool.run(`
|
||||
require('worker_threads').parentPort.postMessage("some message");
|
||||
42
|
||||
`);
|
||||
|
||||
assert.strictEqual(await taskResult, 42);
|
||||
assert.strictEqual((await messagePromise)[0], 'some message');
|
||||
});
|
||||
+93
@@ -0,0 +1,93 @@
|
||||
import assert from 'node:assert';
|
||||
import { test } from 'node:test';
|
||||
import { types } from 'node:util';
|
||||
import { MessageChannel, MessagePort } from 'node:worker_threads';
|
||||
import { resolve } from 'node:path';
|
||||
import Piscina from '..';
|
||||
import {
|
||||
isMovable,
|
||||
markMovable,
|
||||
isTransferable
|
||||
} from '../dist/common';
|
||||
|
||||
const {
|
||||
transferableSymbol,
|
||||
valueSymbol
|
||||
} = Piscina;
|
||||
|
||||
test('Marking an object as movable works as expected', () => {
|
||||
const obj : any = {
|
||||
get [transferableSymbol] () : object { return {}; },
|
||||
get [valueSymbol] () : object { return {}; }
|
||||
};
|
||||
assert.ok(isTransferable(obj));
|
||||
assert.ok(!isMovable(obj)); // It's not movable initially
|
||||
markMovable(obj);
|
||||
assert.ok(isMovable(obj)); // It is movable now
|
||||
});
|
||||
|
||||
test('Marking primitives and null works as expected', () => {
|
||||
assert.strictEqual(Piscina.move(null), null);
|
||||
assert.strictEqual(Piscina.move(1 as any), 1);
|
||||
assert.strictEqual(Piscina.move(false as any), false);
|
||||
assert.strictEqual(Piscina.move('test' as any), 'test');
|
||||
});
|
||||
|
||||
test('Using Piscina.move() returns a movable object', () => {
|
||||
const obj : any = {
|
||||
get [transferableSymbol] () : object { return {}; },
|
||||
get [valueSymbol] () : object { return {}; }
|
||||
};
|
||||
assert.ok(!isMovable(obj)); // It's not movable initially
|
||||
const movable = Piscina.move(obj);
|
||||
assert.ok(isMovable(movable)); // It is movable now
|
||||
});
|
||||
|
||||
test('Using ArrayBuffer works as expected', () => {
|
||||
const ab = new ArrayBuffer(5);
|
||||
const movable = Piscina.move(ab);
|
||||
assert.ok(isMovable(movable));
|
||||
assert.ok(types.isAnyArrayBuffer(movable[valueSymbol]));
|
||||
assert.ok(types.isAnyArrayBuffer(movable[transferableSymbol]));
|
||||
assert.strictEqual(movable[transferableSymbol], ab);
|
||||
});
|
||||
|
||||
test('Using TypedArray works as expected', () => {
|
||||
const ab = new Uint8Array(5);
|
||||
const movable = Piscina.move(ab);
|
||||
assert.ok(isMovable(movable));
|
||||
assert.ok((types as any).isArrayBufferView(movable[valueSymbol]));
|
||||
assert.ok(types.isAnyArrayBuffer(movable[transferableSymbol]));
|
||||
assert.strictEqual(movable[transferableSymbol], ab.buffer);
|
||||
});
|
||||
|
||||
test('Using MessagePort works as expected', () => {
|
||||
const mc = new MessageChannel();
|
||||
const movable = Piscina.move(mc.port1);
|
||||
assert.ok(isMovable(movable));
|
||||
assert.ok(movable[valueSymbol] instanceof MessagePort);
|
||||
assert.ok(movable[transferableSymbol] instanceof MessagePort);
|
||||
assert.strictEqual(movable[transferableSymbol], mc.port1);
|
||||
});
|
||||
|
||||
test('Moving works', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/move.ts')
|
||||
});
|
||||
|
||||
{
|
||||
// Test with empty transferList
|
||||
const ab = new ArrayBuffer(10);
|
||||
const ret = await pool.run(Piscina.move(ab));
|
||||
assert.strictEqual(ab.byteLength, 0); // It was moved
|
||||
assert.ok(types.isAnyArrayBuffer(ret));
|
||||
}
|
||||
|
||||
{
|
||||
// Test with empty transferList
|
||||
const ab = new ArrayBuffer(10);
|
||||
const ret = await pool.run(Piscina.move(ab), { transferList: [] });
|
||||
assert.strictEqual(ab.byteLength, 0); // It was moved
|
||||
assert.ok(types.isAnyArrayBuffer(ret));
|
||||
}
|
||||
});
|
||||
+50
@@ -0,0 +1,50 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { resolve } from 'node:path';
|
||||
import { test } from 'node:test';
|
||||
import { getCurrentProcessPriority, WindowsThreadPriority } from '@napi-rs/nice';
|
||||
import Piscina from '..';
|
||||
|
||||
test('niceness - Linux:', { skip: process.platform !== 'linux' }, async scope => {
|
||||
scope.plan(2);
|
||||
|
||||
await scope.test('can set niceness for threads on Linux', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
niceIncrement: 5
|
||||
});
|
||||
|
||||
// ts-ignore because the dependency is not installed on Windows.
|
||||
// @ts-ignore
|
||||
const currentNiceness = getCurrentProcessPriority();
|
||||
const result = await worker.run('require("@napi-rs/nice").getCurrentProcessPriority()');
|
||||
// niceness is capped to 19 on Linux.
|
||||
const expected = Math.min(currentNiceness + 5, 19);
|
||||
assert.strictEqual(result, expected);
|
||||
});
|
||||
|
||||
await scope.test('setting niceness never does anything bad', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
niceIncrement: 5
|
||||
});
|
||||
|
||||
const result = await worker.run('42');
|
||||
assert.strictEqual(result, 42);
|
||||
});
|
||||
});
|
||||
|
||||
test('niceness - Windows', {
|
||||
skip: process.platform !== 'win32'
|
||||
}, scope => {
|
||||
scope.plan(1);
|
||||
scope.test('can set niceness for threads on Windows', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
niceIncrement: WindowsThreadPriority.ThreadPriorityAboveNormal
|
||||
});
|
||||
|
||||
const result = await worker.run('require("@napi-rs/nice").getCurrentProcessPriority()');
|
||||
|
||||
assert.strictEqual(result, WindowsThreadPriority.ThreadPriorityAboveNormal);
|
||||
});
|
||||
});
|
||||
+132
@@ -0,0 +1,132 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { test } from 'node:test';
|
||||
import Piscina from '..';
|
||||
|
||||
test('filename cannot be non-null/non-string',() => {
|
||||
assert.throws(() => new Piscina(({
|
||||
filename: 12
|
||||
}) as any), /options.filename must be a string or null/);
|
||||
});
|
||||
|
||||
test('name cannot be non-null/non-string', () => {
|
||||
assert.throws(() => new Piscina(({
|
||||
name: 12
|
||||
}) as any), /options.name must be a string or null/);
|
||||
});
|
||||
|
||||
test('minThreads must be non-negative integer', () => {
|
||||
assert.throws(() => new Piscina(({
|
||||
minThreads: -1
|
||||
}) as any), /options.minThreads must be a non-negative integer/);
|
||||
|
||||
assert.throws(() => new Piscina(({
|
||||
minThreads: 'string'
|
||||
}) as any), /options.minThreads must be a non-negative integer/);
|
||||
});
|
||||
|
||||
test('maxThreads must be positive integer', () => {
|
||||
assert.throws(() => new Piscina(({
|
||||
maxThreads: -1
|
||||
}) as any), /options.maxThreads must be a positive integer/);
|
||||
|
||||
assert.throws(() => new Piscina(({
|
||||
maxThreads: 0
|
||||
}) as any), /options.maxThreads must be a positive integer/);
|
||||
|
||||
assert.throws(() => new Piscina(({
|
||||
maxThreads: 'string'
|
||||
}) as any), /options.maxThreads must be a positive integer/);
|
||||
});
|
||||
|
||||
test('concurrentTasksPerWorker must be positive integer', () => {
|
||||
assert.throws(() => new Piscina(({
|
||||
concurrentTasksPerWorker: -1
|
||||
}) as any), /options.concurrentTasksPerWorker must be a positive integer/);
|
||||
|
||||
assert.throws(() => new Piscina(({
|
||||
concurrentTasksPerWorker: 0
|
||||
}) as any), /options.concurrentTasksPerWorker must be a positive integer/);
|
||||
|
||||
assert.throws(() => new Piscina(({
|
||||
concurrentTasksPerWorker: 'string'
|
||||
}) as any), /options.concurrentTasksPerWorker must be a positive integer/);
|
||||
});
|
||||
|
||||
test('idleTimeout must be non-negative integer', () => {
|
||||
assert.throws(() => new Piscina(({
|
||||
idleTimeout: -1
|
||||
}) as any), /options.idleTimeout must be a non-negative integer/);
|
||||
|
||||
assert.throws(() => new Piscina(({
|
||||
idleTimeout: 'string'
|
||||
}) as any), /options.idleTimeout must be a non-negative integer/);
|
||||
});
|
||||
|
||||
test('maxQueue must be non-negative integer', () => {
|
||||
assert.throws(() => new Piscina(({
|
||||
maxQueue: -1
|
||||
}) as any), /options.maxQueue must be a non-negative integer/);
|
||||
|
||||
assert.throws(() => new Piscina(({
|
||||
maxQueue: 'string'
|
||||
}) as any), /options.maxQueue must be a non-negative integer/);
|
||||
|
||||
const p = new Piscina({ maxQueue: 'auto', maxThreads: 2 });
|
||||
assert.strictEqual(p.options.maxQueue, 4);
|
||||
});
|
||||
|
||||
test('atomics must be valid', () => {
|
||||
assert.throws(() => new Piscina(({
|
||||
atomics: -1
|
||||
}) as any), /options.atomics should be a value of sync, sync or disabled./);
|
||||
|
||||
assert.throws(() => new Piscina(({
|
||||
atomics: 'string'
|
||||
}) as any), /options.atomics should be a value of sync, sync or disabled./);
|
||||
});
|
||||
|
||||
test('resourceLimits must be an object', () => {
|
||||
assert.throws(() => new Piscina(({
|
||||
resourceLimits: 0
|
||||
}) as any), /options.resourceLimits must be an object/);
|
||||
});
|
||||
|
||||
test('taskQueue must be a TaskQueue object', () => {
|
||||
assert.throws(() => new Piscina(({
|
||||
taskQueue: 0
|
||||
}) as any), /options.taskQueue must be a TaskQueue object/);
|
||||
assert.throws(() => new Piscina(({
|
||||
taskQueue: 'test'
|
||||
}) as any), /options.taskQueue must be a TaskQueue object/);
|
||||
assert.throws(() => new Piscina(({
|
||||
taskQueue: null
|
||||
}) as any), /options.taskQueue must be a TaskQueue object/);
|
||||
assert.throws(() => new Piscina(({
|
||||
taskQueue: new Date()
|
||||
}) as any), /options.taskQueue must be a TaskQueue object/);
|
||||
assert.throws(() => new Piscina(({
|
||||
taskQueue: { } as any
|
||||
}) as any), /options.taskQueue must be a TaskQueue object/);
|
||||
});
|
||||
|
||||
test('niceIncrement must be non-negative integer on Unix', {
|
||||
skip: process.platform === 'win32' ? 'Unix options validate' : false
|
||||
}, () => {
|
||||
assert.throws(() => new Piscina(({
|
||||
niceIncrement: -1
|
||||
}) as any), /options.niceIncrement must be a non-negative integer/);
|
||||
|
||||
assert.throws(() => new Piscina(({
|
||||
niceIncrement: 'string'
|
||||
}) as any), /options.niceIncrement must be a non-negative integer/);
|
||||
});
|
||||
|
||||
test('trackUnmanagedFds must be a boolean', () => {
|
||||
assert.throws(() => new Piscina(({
|
||||
trackUnmanagedFds: -1
|
||||
}) as any), /options.trackUnmanagedFds must be a boolean/);
|
||||
|
||||
assert.throws(() => new Piscina(({
|
||||
trackUnmanagedFds: 'string'
|
||||
}) as any), /options.trackUnmanagedFds must be a boolean/);
|
||||
});
|
||||
+84
@@ -0,0 +1,84 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { once } from 'node:events';
|
||||
import { resolve } from 'node:path';
|
||||
import { describe, it, test } from 'node:test';
|
||||
import Piscina from '..';
|
||||
|
||||
describe('close()', () => {
|
||||
it('no pending tasks', async () => {
|
||||
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js') });
|
||||
await pool.close();
|
||||
assert.ok('pool closed successfully');
|
||||
});
|
||||
|
||||
it('no pending tasks (with minThreads=0)', async () => {
|
||||
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), minThreads: 0 });
|
||||
await pool.close();
|
||||
assert.ok('pool closed successfully');
|
||||
});
|
||||
});
|
||||
|
||||
test('queued tasks waits for all tasks to complete', async () => {
|
||||
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });
|
||||
|
||||
const task1 = pool.run({ time: 100 });
|
||||
const task2 = pool.run({ time: 100 });
|
||||
setImmediate(() => assert.doesNotReject(pool.close(), 'close is resolved when all running tasks are completed'));
|
||||
await Promise.all([
|
||||
assert.doesNotReject(once(pool, 'close'), 'handler is called when pool is closed'),
|
||||
assert.doesNotReject(task1, 'complete running task'),
|
||||
assert.doesNotReject(task2, 'complete running task')
|
||||
]);
|
||||
});
|
||||
|
||||
test('abort any task enqueued during closing up', async () => {
|
||||
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });
|
||||
|
||||
setImmediate(() => {
|
||||
assert.doesNotReject(pool.close(), 'close is resolved when running tasks are completed');
|
||||
assert.doesNotReject(pool.run({ time: 1000 }).then(null, err => {
|
||||
assert.strictEqual(err.message, 'The task has been aborted');
|
||||
assert.strictEqual(err.cause, 'queue is being terminated');
|
||||
}));
|
||||
});
|
||||
|
||||
await assert.doesNotReject(pool.run({ time: 100 }), 'complete running task');
|
||||
});
|
||||
|
||||
test('force: queued tasks waits for all tasks already running and aborts tasks that are not started yet', async () => {
|
||||
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1, concurrentTasksPerWorker: 2 });
|
||||
|
||||
const task1 = pool.run({ time: 5 });
|
||||
const task2 = pool.run({ time: 5 });
|
||||
// const task3 = pool.run({ time: 100 });
|
||||
// const task4 = pool.run({ time: 100 });
|
||||
|
||||
assert.doesNotReject(pool.close({ force: true }));
|
||||
assert.doesNotReject(once(pool, 'close'), 'handler is called when pool is closed');
|
||||
assert.doesNotReject(task1, 'complete running task');
|
||||
assert.doesNotReject(task2, 'complete running task');
|
||||
assert.rejects(pool.run({ time: 100 }), /The task has been aborted/, 'abort task that are not started yet');
|
||||
assert.rejects(pool.run({ time: 100 }), /The task has been aborted/, 'abort task that are not started yet');
|
||||
|
||||
await task1;
|
||||
await task2;
|
||||
});
|
||||
|
||||
test('timed out close operation destroys the pool', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/sleep.js'),
|
||||
maxThreads: 1,
|
||||
closeTimeout: 5 // 5ms
|
||||
});
|
||||
|
||||
const task1 = pool.run({ time: 50 });
|
||||
const task2 = pool.run({ time: 50 });
|
||||
|
||||
setImmediate(() => assert.doesNotReject(pool.close(), 'close is resolved on timeout'));
|
||||
|
||||
await Promise.all([
|
||||
assert.doesNotReject(once(pool, 'error'), 'error handler is called on timeout'),
|
||||
assert.rejects(task1, /Terminating worker thread/, 'task is aborted due to timeout'),
|
||||
assert.rejects(task2, /Terminating worker thread/, 'task is aborted due to timeout')
|
||||
]);
|
||||
});
|
||||
+13
@@ -0,0 +1,13 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { test } from 'node:test';
|
||||
import { resolve } from 'node:path';
|
||||
import Piscina from '..';
|
||||
|
||||
test('can destroy pool while tasks are running', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
setImmediate(() => pool.destroy());
|
||||
|
||||
assert.rejects(pool.run('while(1){}'), /Terminating worker thread/);
|
||||
});
|
||||
+127
@@ -0,0 +1,127 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { resolve } from 'node:path';
|
||||
import { test } from 'node:test';
|
||||
import { once } from 'node:events';
|
||||
import Piscina from '../dist';
|
||||
|
||||
const nodeVersion = Number(process.versions.node.split('.')[0])
|
||||
|
||||
test('workerCreate/workerDestroy should be emitted while managing worker lifecycle', async () => {
|
||||
let index = 0;
|
||||
// Its expected to have one task get balanced twice due to the load balancer distribution
|
||||
// first task enters, its distributed; second is enqueued, once first is done, second is distributed and normalizes
|
||||
let newWorkers = 0;
|
||||
let destroyedWorkers = 0;
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
maxThreads: 3,
|
||||
minThreads: 3,
|
||||
concurrentTasksPerWorker: 1,
|
||||
loadBalancer (_task, workers) {
|
||||
// Verify distribution to properly test this feature
|
||||
const candidate = workers[index++ % workers.length];
|
||||
if (candidate != null && candidate.currentUsage >= 1) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return candidate;
|
||||
}
|
||||
});
|
||||
|
||||
pool.on('workerCreate', () => {
|
||||
newWorkers++;
|
||||
});
|
||||
|
||||
pool.on('workerDestroy', () => {
|
||||
destroyedWorkers++;
|
||||
});
|
||||
|
||||
const tasks = [];
|
||||
const controller = new AbortController();
|
||||
const signal = controller.signal;
|
||||
tasks.push(pool.run('while (true) {}', {
|
||||
signal
|
||||
}));
|
||||
|
||||
for (let n = 0; n < 5; n++) {
|
||||
tasks.push(pool.run('new Promise(resolve => setTimeout(resolve, 5))'));
|
||||
}
|
||||
|
||||
controller.abort();
|
||||
await Promise.allSettled(tasks);
|
||||
await pool.close();
|
||||
assert.strictEqual(destroyedWorkers, 4);
|
||||
assert.strictEqual(newWorkers, 4);
|
||||
});
|
||||
|
||||
test('Explicit resource management (dispose)', { skip: nodeVersion !== 24 }, async () => {
|
||||
const piscina = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
maxThreads: 1,
|
||||
minThreads: 1,
|
||||
concurrentTasksPerWorker: 1,
|
||||
});
|
||||
|
||||
{
|
||||
using pool = piscina;
|
||||
const tasks = [];
|
||||
;
|
||||
pool.once('close', () => {
|
||||
assert.ok(true);
|
||||
});
|
||||
|
||||
for (let n = 0; n < 10; n++) {
|
||||
tasks.push(pool.run('new Promise(resolve => setTimeout(resolve, 5))'));
|
||||
}
|
||||
}
|
||||
|
||||
await once(piscina, 'close');
|
||||
});
|
||||
|
||||
test('Explicit resource management (asyncDispose)', { skip: nodeVersion !== 24 }, async () => {
|
||||
const piscina = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
maxThreads: 1,
|
||||
minThreads: 1,
|
||||
concurrentTasksPerWorker: 1,
|
||||
});
|
||||
|
||||
{
|
||||
await using pool = piscina;
|
||||
const tasks = [];
|
||||
|
||||
pool.once('close', () => {
|
||||
assert.ok(true);
|
||||
});
|
||||
|
||||
for (let n = 0; n < 10; n++) {
|
||||
tasks.push(pool.run('new Promise(resolve => setTimeout(resolve, 5))'));
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
test('#805 - Concurrent Aborts', async (t) => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
maxThreads: 1,
|
||||
minThreads: 1,
|
||||
concurrentTasksPerWorker: 1,
|
||||
});
|
||||
|
||||
t.after(() => pool.close());
|
||||
|
||||
const tasks = [];
|
||||
const controller = new AbortController();
|
||||
const controller2 = new AbortController();
|
||||
const controller3 = new AbortController();
|
||||
|
||||
tasks.push(assert.rejects(pool.run('new Promise(resolve => setTimeout(resolve, 5))', { signal: controller.signal })));
|
||||
tasks.push(assert.rejects(pool.run('new Promise(resolve => setTimeout(resolve, 5))', { signal: controller2.signal })));
|
||||
tasks.push(pool.run('new Promise(resolve => setTimeout(resolve, 5))', { signal: controller3.signal }));
|
||||
|
||||
|
||||
controller.abort();
|
||||
controller2.abort();
|
||||
|
||||
await Promise.all(tasks);
|
||||
});
|
||||
+181
@@ -0,0 +1,181 @@
|
||||
import assert from 'node:assert';
|
||||
import { test, TestContext } from 'node:test';
|
||||
import { MessageChannel } from 'node:worker_threads';
|
||||
import { resolve } from 'node:path';
|
||||
import Piscina from '..';
|
||||
import { getAvailableParallelism } from '../dist/common';
|
||||
|
||||
test('postTask() can transfer ArrayBuffer instances', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/simple-isworkerthread.ts')
|
||||
});
|
||||
|
||||
const ab = new ArrayBuffer(40);
|
||||
await pool.run({ ab }, { transferList: [ab] });
|
||||
assert.strictEqual(pool.completed, 1);
|
||||
assert.strictEqual(ab.byteLength, 0);
|
||||
});
|
||||
|
||||
test('postTask() cannot clone build-in objects', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/simple-isworkerthread.ts')
|
||||
});
|
||||
|
||||
const obj = new MessageChannel().port1;
|
||||
assert.rejects(pool.run({ obj }));
|
||||
});
|
||||
|
||||
test('postTask() resolves with a rejection when the handler rejects', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
assert.rejects(pool.run('Promise.reject(new Error("foo"))'), /foo/);
|
||||
});
|
||||
|
||||
test('postTask() resolves with a rejection when the handler throws', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
assert.rejects(pool.run('throw new Error("foo")'), /foo/);
|
||||
});
|
||||
|
||||
test('postTask() validates transferList', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
assert.rejects(pool.run('0', { transferList: 42 as any }),
|
||||
/transferList argument must be an Array/);
|
||||
});
|
||||
|
||||
test('postTask() validates filename', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
assert.rejects(pool.run('0', { filename: 42 as any }),
|
||||
/filename argument must be a string/);
|
||||
});
|
||||
|
||||
test('postTask() validates name', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
assert.rejects(pool.run('0', { name: 42 as any }),
|
||||
/name argument must be a string/);
|
||||
});
|
||||
|
||||
test('postTask() validates abortSignal', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
assert.rejects(pool.run('0', { signal: 42 as any }),
|
||||
/signal argument must be an object/);
|
||||
});
|
||||
|
||||
test('Piscina emits drain', async (t: TestContext) => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
maxThreads: 1
|
||||
});
|
||||
|
||||
t.plan(2);
|
||||
|
||||
pool.on('drain', () => {
|
||||
t.assert.ok(true);
|
||||
t.assert.ok(!pool.needsDrain);
|
||||
});
|
||||
|
||||
await Promise.all([pool.run('123'), pool.run('123'), pool.run('123')]);
|
||||
});
|
||||
|
||||
test('Piscina exposes/emits needsDrain to true when capacity is exceeded', async (t: TestContext) => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
maxQueue: 3,
|
||||
maxThreads: 1
|
||||
});
|
||||
|
||||
t.plan(2);
|
||||
|
||||
pool.once('drain', () => {
|
||||
t.assert.ok(true);
|
||||
});
|
||||
pool.once('needsDrain', () => {
|
||||
t.assert.ok(pool.needsDrain);
|
||||
});
|
||||
|
||||
await Promise.all([
|
||||
pool.run('123'),
|
||||
pool.run('123'),
|
||||
pool.run('123'),
|
||||
pool.run('123')
|
||||
]);
|
||||
});
|
||||
|
||||
test('Piscina can use async loaded workers', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval-async.js')
|
||||
});
|
||||
|
||||
assert.strictEqual(await pool.run('1'), 1);
|
||||
});
|
||||
|
||||
test('Piscina can use async loaded esm workers', {}, async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/esm-async.mjs')
|
||||
});
|
||||
|
||||
assert.strictEqual(await pool.run('1'), 1);
|
||||
});
|
||||
|
||||
test('Piscina.run options is correct type', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
assert.rejects(pool.run(42, 1 as any), /options must be an object/);
|
||||
});
|
||||
|
||||
test('Piscina.maxThreads should return the max number of threads to be used (default)', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
const maxThreads = getAvailableParallelism() * 1.5;
|
||||
|
||||
assert.strictEqual(pool.maxThreads, maxThreads);
|
||||
});
|
||||
|
||||
test('Piscina.minThreads should return the max number of threads to be used (custom)', () => {
|
||||
const maxThreads = 3;
|
||||
const pool = new Piscina({
|
||||
maxThreads,
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
assert.strictEqual(pool.maxThreads, maxThreads);
|
||||
});
|
||||
|
||||
test('Piscina.minThreads should return the max number of threads to be used (default)', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
const minThreads = Math.max(Math.floor(getAvailableParallelism() / 2), 1);
|
||||
|
||||
assert.strictEqual(pool.minThreads, minThreads);
|
||||
});
|
||||
|
||||
test('Piscina.minThreads should return the max number of threads to be used (custom)', () => {
|
||||
const minThreads = 2;
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
minThreads
|
||||
});
|
||||
|
||||
assert.strictEqual(pool.minThreads, minThreads);
|
||||
});
|
||||
+236
@@ -0,0 +1,236 @@
|
||||
import assert from 'node:assert';
|
||||
import { test } from 'node:test';
|
||||
import { pathToFileURL } from 'node:url';
|
||||
import { resolve } from 'node:path';
|
||||
import { EventEmitter } from 'node:events';
|
||||
import Piscina from '..';
|
||||
import { version } from '../package.json';
|
||||
|
||||
test('Piscina is exposed on export', () => {
|
||||
assert.strictEqual(Piscina.version, version);
|
||||
});
|
||||
|
||||
test('Piscina is exposed on itself', () => {
|
||||
assert.strictEqual(Piscina.Piscina, Piscina);
|
||||
});
|
||||
|
||||
test('Piscina.isWorkerThread has the correct value', () => {
|
||||
assert.strictEqual(Piscina.isWorkerThread, false);
|
||||
});
|
||||
|
||||
test('Piscina.isWorkerThread has the correct value (worker)', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/simple-isworkerthread.ts')
|
||||
});
|
||||
const result = await worker.run(null);
|
||||
assert.strictEqual(result, 'done');
|
||||
});
|
||||
|
||||
test('Piscina.isWorkerThread has the correct value (worker) with named import', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/simple-isworkerthread-named-import.ts')
|
||||
});
|
||||
const result = await worker.run(null);
|
||||
assert.strictEqual(result, 'done');
|
||||
});
|
||||
|
||||
test('Piscina.isWorkerThread has the correct value (worker) with named import', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/simple-isworkerthread-named-import.ts')
|
||||
});
|
||||
const result = await worker.run(null);
|
||||
assert.strictEqual(result, 'done');
|
||||
});
|
||||
|
||||
test('Piscina instance is an EventEmitter', async () => {
|
||||
const piscina = new Piscina();
|
||||
assert.ok(piscina instanceof EventEmitter);
|
||||
});
|
||||
|
||||
test('Piscina constructor options are correctly set', async () => {
|
||||
const piscina = new Piscina({
|
||||
minThreads: 10,
|
||||
maxThreads: 20,
|
||||
maxQueue: 30
|
||||
});
|
||||
|
||||
assert.strictEqual(piscina.options.minThreads, 10);
|
||||
assert.strictEqual(piscina.options.maxThreads, 20);
|
||||
assert.strictEqual(piscina.options.maxQueue, 30);
|
||||
});
|
||||
|
||||
test('trivial eval() handler works', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
const result = await worker.run('42');
|
||||
assert.strictEqual(result, 42);
|
||||
});
|
||||
|
||||
test('async eval() handler works', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
const result = await worker.run('Promise.resolve(42)');
|
||||
assert.strictEqual(result, 42);
|
||||
});
|
||||
|
||||
test('filename can be provided while posting', async () => {
|
||||
const worker = new Piscina();
|
||||
const result = await worker.run(
|
||||
'Promise.resolve(42)',
|
||||
{ filename: resolve(__dirname, 'fixtures/eval.js') });
|
||||
assert.strictEqual(result, 42);
|
||||
});
|
||||
|
||||
test('filename can be null when initially provided', async () => {
|
||||
const worker = new Piscina({ filename: null });
|
||||
const result = await worker.run(
|
||||
'Promise.resolve(42)',
|
||||
{ filename: resolve(__dirname, 'fixtures/eval.js') });
|
||||
assert.strictEqual(result, 42);
|
||||
});
|
||||
|
||||
test('filename must be provided while posting', () => {
|
||||
const worker = new Piscina();
|
||||
assert.rejects(worker.run('doesn’t matter'),
|
||||
/filename must be provided to run\(\) or in options object/);
|
||||
});
|
||||
|
||||
test('passing env to workers works', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
env: { A: 'foo' }
|
||||
});
|
||||
|
||||
const env = await pool.run('({...process.env})');
|
||||
assert.deepStrictEqual(env, { A: 'foo' });
|
||||
});
|
||||
|
||||
test('passing argv to workers works', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
argv: ['a', 'b', 'c']
|
||||
});
|
||||
|
||||
const env = await pool.run('process.argv.slice(2)');
|
||||
assert.deepStrictEqual(env, ['a', 'b', 'c']);
|
||||
});
|
||||
|
||||
test('passing execArgv to workers works', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
execArgv: ['--no-warnings']
|
||||
});
|
||||
|
||||
const env = await pool.run('process.execArgv');
|
||||
assert.deepStrictEqual(env, ['--no-warnings']);
|
||||
});
|
||||
|
||||
test('passing valid workerData works', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/simple-workerdata.ts'),
|
||||
workerData: 'ABC'
|
||||
});
|
||||
assert.strictEqual(Piscina.workerData, undefined);
|
||||
|
||||
await pool.run(null);
|
||||
});
|
||||
|
||||
test('passing valid workerData works with named import', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/simple-workerdata-named-import.ts'),
|
||||
workerData: 'ABC'
|
||||
});
|
||||
assert.strictEqual(Piscina.workerData, undefined);
|
||||
|
||||
await pool.run(null);
|
||||
});
|
||||
|
||||
test('passing valid workerData works with named import', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/simple-workerdata-named-import.ts'),
|
||||
workerData: 'ABC'
|
||||
});
|
||||
assert.strictEqual(Piscina.workerData, undefined);
|
||||
|
||||
await pool.run(null);
|
||||
});
|
||||
|
||||
test('passing invalid workerData does not work', () => {
|
||||
assert.throws(() => new Piscina(({
|
||||
filename: resolve(__dirname, 'fixtures/simple-workerdata.ts'),
|
||||
workerData: {
|
||||
hello () {}
|
||||
}
|
||||
})), /could not be cloned./);
|
||||
});
|
||||
|
||||
test('filename can be a file:// URL', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: pathToFileURL(resolve(__dirname, 'fixtures/eval.js')).href
|
||||
});
|
||||
const result = await worker.run('42');
|
||||
assert.strictEqual(result, 42);
|
||||
});
|
||||
|
||||
test('filename can be a file:// URL to an ESM module', {}, async () => {
|
||||
const worker = new Piscina({
|
||||
filename: pathToFileURL(resolve(__dirname, 'fixtures/esm-export.mjs')).href
|
||||
});
|
||||
const result = await worker.run('42');
|
||||
assert.strictEqual(result, 42);
|
||||
});
|
||||
|
||||
test('duration and utilization calculations work', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
// Initial utilization is always 0
|
||||
assert.strictEqual(worker.utilization, 0);
|
||||
|
||||
await Promise.all([
|
||||
worker.run('42'),
|
||||
worker.run('41'),
|
||||
worker.run('40')
|
||||
]);
|
||||
|
||||
// utilization is going to be some non-deterministic value
|
||||
// between 0 and 1. It should not be zero at this point
|
||||
// because tasks have run, but it should also never be 1
|
||||
assert.ok(worker.utilization > 0);
|
||||
assert.ok(worker.utilization < 1);
|
||||
|
||||
// Duration must be non-zero.
|
||||
assert.ok(worker.duration > 0);
|
||||
});
|
||||
|
||||
test('run works also', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
|
||||
await worker.run(42);
|
||||
});
|
||||
|
||||
test('named tasks work', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/multiple.js')
|
||||
});
|
||||
|
||||
assert.strictEqual(await worker.run({}, { name: 'a' }), 'a');
|
||||
assert.strictEqual(await worker.run({}, { name: 'b' }), 'b');
|
||||
assert.strictEqual(await worker.run({}), 'a');
|
||||
});
|
||||
|
||||
test('named tasks work', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/multiple.js'),
|
||||
name: 'b'
|
||||
});
|
||||
|
||||
assert.strictEqual(await worker.run({}, { name: 'a' }), 'a');
|
||||
assert.strictEqual(await worker.run({}, { name: 'b' }), 'b');
|
||||
assert.strictEqual(await worker.run({}), 'b');
|
||||
});
|
||||
+281
@@ -0,0 +1,281 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { test } from 'node:test';
|
||||
import { resolve } from 'node:path';
|
||||
import Piscina, { PiscinaTask, TaskQueue } from '..';
|
||||
|
||||
test('will put items into a task queue until they can run', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
|
||||
minThreads: 2,
|
||||
maxThreads: 3
|
||||
});
|
||||
|
||||
assert.strictEqual(pool.threads.length, 2);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
const buffers = [
|
||||
new Int32Array(new SharedArrayBuffer(4)),
|
||||
new Int32Array(new SharedArrayBuffer(4)),
|
||||
new Int32Array(new SharedArrayBuffer(4)),
|
||||
new Int32Array(new SharedArrayBuffer(4))
|
||||
];
|
||||
|
||||
const results = [];
|
||||
|
||||
results.push(pool.run(buffers[0]));
|
||||
assert.strictEqual(pool.threads.length, 2);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
results.push(pool.run(buffers[1]));
|
||||
assert.strictEqual(pool.threads.length, 2);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
results.push(pool.run(buffers[2]));
|
||||
assert.strictEqual(pool.threads.length, 3);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
results.push(pool.run(buffers[3]));
|
||||
assert.strictEqual(pool.threads.length, 3);
|
||||
assert.strictEqual(pool.queueSize, 1);
|
||||
|
||||
for (const buffer of buffers) {
|
||||
Atomics.store(buffer, 0, 1);
|
||||
Atomics.notify(buffer, 0, 1);
|
||||
}
|
||||
|
||||
await results[0];
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
await Promise.all(results);
|
||||
});
|
||||
|
||||
test('will reject items over task queue limit', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
minThreads: 0,
|
||||
maxThreads: 1,
|
||||
maxQueue: 2
|
||||
});
|
||||
|
||||
assert.strictEqual(pool.threads.length, 0);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
assert.rejects(pool.run('while (true) {}'), /Terminating worker thread/);
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
assert.rejects(pool.run('while (true) {}'), /Terminating worker thread/);
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 1);
|
||||
|
||||
assert.rejects(pool.run('while (true) {}'), /Terminating worker thread/);
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 2);
|
||||
|
||||
assert.rejects(pool.run('while (true) {}'), /Task queue is at limit/);
|
||||
await pool.destroy();
|
||||
});
|
||||
|
||||
test('will reject items when task queue is unavailable', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
minThreads: 0,
|
||||
maxThreads: 1,
|
||||
maxQueue: 0
|
||||
});
|
||||
|
||||
assert.strictEqual(pool.threads.length, 0);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
assert.rejects(pool.run('while (true) {}'), /Terminating worker thread/);
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
assert.rejects(pool.run('while (true) {}'), /No task queue available and all Workers are busy/);
|
||||
await pool.destroy();
|
||||
});
|
||||
|
||||
test('will reject items when task queue is unavailable (fixed thread count)', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
minThreads: 1,
|
||||
maxThreads: 1,
|
||||
maxQueue: 0
|
||||
});
|
||||
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
assert.rejects(pool.run('while (true) {}'), /Terminating worker thread/);
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
assert.rejects(pool.run('while (true) {}'), /No task queue available and all Workers are busy/);
|
||||
await pool.destroy();
|
||||
});
|
||||
|
||||
test('tasks can share a Worker if requested (both tests blocking)', async (t) => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
|
||||
minThreads: 0,
|
||||
maxThreads: 1,
|
||||
maxQueue: 0,
|
||||
concurrentTasksPerWorker: 2
|
||||
});
|
||||
|
||||
assert.strictEqual(pool.threads.length, 0);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
assert.rejects(pool.run(new Int32Array(new SharedArrayBuffer(4))));
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
assert.rejects(pool.run(new Int32Array(new SharedArrayBuffer(4))));
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
await pool.destroy();
|
||||
});
|
||||
|
||||
test('tasks can share a Worker if requested (one test finishes)', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/wait-for-notify.js'),
|
||||
minThreads: 0,
|
||||
maxThreads: 1,
|
||||
maxQueue: 0,
|
||||
concurrentTasksPerWorker: 2
|
||||
});
|
||||
|
||||
const buffers = [
|
||||
new Int32Array(new SharedArrayBuffer(4)),
|
||||
new Int32Array(new SharedArrayBuffer(4))
|
||||
];
|
||||
|
||||
assert.strictEqual(pool.threads.length, 0);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
const firstTask = pool.run(buffers[0]);
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
assert.rejects(pool.run(
|
||||
'new Promise((resolve) => setTimeout(resolve, 5))',
|
||||
{ filename: resolve(__dirname, 'fixtures/eval.js') })
|
||||
, /Terminating worker thread/);
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
Atomics.store(buffers[0], 0, 1);
|
||||
Atomics.notify(buffers[0], 0, 1);
|
||||
|
||||
await firstTask;
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
await pool.destroy();
|
||||
});
|
||||
|
||||
test('tasks can share a Worker if requested (both tests finish)', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
|
||||
minThreads: 1,
|
||||
maxThreads: 1,
|
||||
maxQueue: 0,
|
||||
concurrentTasksPerWorker: 2
|
||||
});
|
||||
|
||||
const buffers = [
|
||||
new Int32Array(new SharedArrayBuffer(4)),
|
||||
new Int32Array(new SharedArrayBuffer(4))
|
||||
];
|
||||
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
const firstTask = pool.run(buffers[0]);
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
const secondTask = pool.run(buffers[1]);
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
|
||||
Atomics.store(buffers[0], 0, 1);
|
||||
Atomics.store(buffers[1], 0, 1);
|
||||
Atomics.notify(buffers[0], 0, 1);
|
||||
Atomics.notify(buffers[1], 0, 1);
|
||||
Atomics.wait(buffers[0], 0, 1);
|
||||
Atomics.wait(buffers[1], 0, 1);
|
||||
|
||||
await firstTask;
|
||||
assert.strictEqual(buffers[0][0], -1);
|
||||
await secondTask;
|
||||
assert.strictEqual(buffers[1][0], -1);
|
||||
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.queueSize, 0);
|
||||
});
|
||||
|
||||
test('custom task queue works', async () => {
|
||||
let sizeCalled : boolean = false;
|
||||
let shiftCalled : boolean = false;
|
||||
let pushCalled : boolean = false;
|
||||
|
||||
class CustomTaskPool implements TaskQueue {
|
||||
tasks: PiscinaTask[] = [];
|
||||
|
||||
get size () : number {
|
||||
sizeCalled = true;
|
||||
return this.tasks.length;
|
||||
}
|
||||
|
||||
shift () : PiscinaTask | null {
|
||||
shiftCalled = true;
|
||||
return this.tasks.length > 0 ? this.tasks.shift() as PiscinaTask : null;
|
||||
}
|
||||
|
||||
push (task : PiscinaTask) : void {
|
||||
pushCalled = true;
|
||||
this.tasks.push(task);
|
||||
|
||||
assert.ok(Piscina.queueOptionsSymbol in task);
|
||||
if ((task as any).task.a === 3) {
|
||||
assert.strictEqual(task[Piscina.queueOptionsSymbol], null);
|
||||
} else {
|
||||
assert.strictEqual(task[Piscina.queueOptionsSymbol].option,
|
||||
(task as any).task.a);
|
||||
}
|
||||
}
|
||||
|
||||
remove (task : PiscinaTask) : void {
|
||||
const index = this.tasks.indexOf(task);
|
||||
this.tasks.splice(index, 1);
|
||||
}
|
||||
};
|
||||
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
taskQueue: new CustomTaskPool(),
|
||||
// Setting maxThreads low enough to ensure we queue
|
||||
maxThreads: 1,
|
||||
minThreads: 1
|
||||
});
|
||||
|
||||
function makeTask (task, option) {
|
||||
return { ...task, [Piscina.queueOptionsSymbol]: { option } };
|
||||
}
|
||||
|
||||
const ret = await Promise.all([
|
||||
pool.run(makeTask({ a: 1 }, 1)),
|
||||
pool.run(makeTask({ a: 2 }, 2)),
|
||||
pool.run({ a: 3 }) // No queueOptionsSymbol attached
|
||||
]);
|
||||
|
||||
assert.strictEqual(ret[0].a, 1);
|
||||
assert.strictEqual(ret[1].a, 2);
|
||||
assert.strictEqual(ret[2].a, 3);
|
||||
|
||||
assert.ok(sizeCalled);
|
||||
assert.ok(pushCalled);
|
||||
assert.ok(shiftCalled);
|
||||
});
|
||||
+36
@@ -0,0 +1,36 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { test } from 'node:test';
|
||||
import { resolve } from 'node:path';
|
||||
import Piscina from '..';
|
||||
|
||||
function wait () {
|
||||
// Timeout here should be little bit longer
|
||||
// than in the worker timeout
|
||||
// to ensure there are no flaky tests
|
||||
return new Promise((resolve) => setTimeout(resolve, 10));
|
||||
}
|
||||
|
||||
test('transferable objects must be transferred', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/send-buffer-then-get-length.js'),
|
||||
atomics: 'disabled'
|
||||
});
|
||||
await pool.run({}, { name: 'send' });
|
||||
await wait();
|
||||
const after = await pool.run({}, { name: 'get' });
|
||||
assert.strictEqual(after, 0);
|
||||
});
|
||||
|
||||
test('objects that implement transferable must be transferred', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(
|
||||
__dirname,
|
||||
'fixtures/send-transferrable-then-get-length.js'
|
||||
),
|
||||
atomics: 'disabled'
|
||||
});
|
||||
await pool.run({}, { name: 'send' });
|
||||
await wait();
|
||||
const after = await pool.run({}, { name: 'get' });
|
||||
assert.strictEqual(after, 0);
|
||||
});
|
||||
+35
@@ -0,0 +1,35 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { test } from 'node:test';
|
||||
import { resolve } from 'node:path';
|
||||
import Piscina from '..';
|
||||
|
||||
test('resourceLimits causes task to reject', async () => {
|
||||
const worker = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/resource-limits.js'),
|
||||
resourceLimits: {
|
||||
maxOldGenerationSizeMb: 16,
|
||||
maxYoungGenerationSizeMb: 4,
|
||||
codeRangeSizeMb: 16
|
||||
}
|
||||
});
|
||||
worker.on('error', () => {
|
||||
// Ignore any additional errors that may occur.
|
||||
// This may happen because when the Worker is
|
||||
// killed a new worker is created that may hit
|
||||
// the memory limits immediately. When that
|
||||
// happens, there is no associated Promise to
|
||||
// reject so we emit an error event instead.
|
||||
// We don't care so much about that here. We
|
||||
// could potentially avoid the issue by setting
|
||||
// higher limits above but rather than try to
|
||||
// guess at limits that may work consistently,
|
||||
// let's just ignore the additional error for
|
||||
// now.
|
||||
});
|
||||
const limits : any = worker.options.resourceLimits;
|
||||
assert.strictEqual(limits.maxOldGenerationSizeMb, 16);
|
||||
assert.strictEqual(limits.maxYoungGenerationSizeMb, 4);
|
||||
assert.strictEqual(limits.codeRangeSizeMb, 16);
|
||||
assert.rejects(worker.run(null),
|
||||
/Worker terminated due to reaching memory limit: JS heap out of memory/);
|
||||
});
|
||||
+87
@@ -0,0 +1,87 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { test } from 'node:test';
|
||||
import { resolve } from 'node:path';
|
||||
import { once } from 'node:events';
|
||||
import Piscina from '..';
|
||||
|
||||
test('uncaught exception resets Worker', ()=> {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
assert.rejects(pool.run('throw new Error("not_caught")'), /not_caught/);
|
||||
});
|
||||
|
||||
test('uncaught exception in immediate resets Worker', ()=> {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js')
|
||||
});
|
||||
assert.rejects(
|
||||
pool.run(`
|
||||
setImmediate(() => { throw new Error("not_caught") });
|
||||
new Promise(() => {}) /* act as if we were doing some work */
|
||||
`), /not_caught/);
|
||||
});
|
||||
|
||||
test('uncaught exception in immediate after task yields error event', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
maxThreads: 1,
|
||||
atomics: 'disabled'
|
||||
});
|
||||
|
||||
const errorEvent : Promise<Error[]> = once(pool, 'error');
|
||||
|
||||
const taskResult = pool.run(`
|
||||
setTimeout(() => { throw new Error("not_caught") }, 5);
|
||||
42
|
||||
`);
|
||||
|
||||
assert.strictEqual(await taskResult, 42);
|
||||
|
||||
// Hack a bit to make sure we get the 'exit'/'error' events.
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
pool.threads[0].ref();
|
||||
|
||||
// This is the main assertion here.
|
||||
assert.strictEqual((await errorEvent)[0].message, 'not_caught');
|
||||
});
|
||||
|
||||
test('exiting process resets worker', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
minThreads: 1
|
||||
});
|
||||
const originalThreadId = pool.threads[0].threadId;
|
||||
await assert.rejects(pool.run('process.exit(1);'), /worker exited with code: 1/);
|
||||
const newThreadId = pool.threads[0].threadId;
|
||||
assert.notStrictEqual(originalThreadId, newThreadId);
|
||||
});
|
||||
|
||||
test('exiting process in immediate after task errors next task and resets worker', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval-async.js'),
|
||||
minThreads: 1
|
||||
});
|
||||
|
||||
const originalThreadId = pool.threads[0].threadId;
|
||||
const taskResult = await pool.run(`
|
||||
setTimeout(() => { process.exit(1); }, 5);
|
||||
42
|
||||
`);
|
||||
assert.strictEqual(taskResult, 42);
|
||||
|
||||
await assert.rejects(pool.run(`
|
||||
'use strict';
|
||||
|
||||
const { promisify } = require('node:util');
|
||||
const sleep = promisify(setTimeout);
|
||||
async function _() {
|
||||
await sleep(5);
|
||||
return 42
|
||||
}
|
||||
_();
|
||||
`), /worker exited with code: 1/);
|
||||
const secondThreadId = pool.threads[0].threadId;
|
||||
|
||||
assert.notStrictEqual(originalThreadId, secondThreadId);
|
||||
});
|
||||
+84
@@ -0,0 +1,84 @@
|
||||
import assert from 'node:assert/strict';
|
||||
import { test } from 'node:test';
|
||||
import { resolve } from 'node:path';
|
||||
import { cpus } from 'node:os';
|
||||
import { once } from 'node:events';
|
||||
import Piscina from '..';
|
||||
|
||||
test('will start with minThreads and max out at maxThreads', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
minThreads: 2,
|
||||
maxThreads: 4,
|
||||
concurrentTasksPerWorker: 1
|
||||
});
|
||||
let counter = 0;
|
||||
|
||||
pool.on('workerCreate', () => {
|
||||
counter++;
|
||||
});
|
||||
|
||||
assert.strictEqual(pool.threads.length, 2);
|
||||
|
||||
assert.rejects(pool.run('while(true) {}'));
|
||||
assert.rejects(pool.run('while(true) {}'));
|
||||
|
||||
// #3
|
||||
assert.rejects(pool.run('while(true) {}'));
|
||||
await once(pool, 'workerCreate');
|
||||
|
||||
// #4
|
||||
assert.rejects(pool.run('while(true) {}'));
|
||||
await once(pool, 'workerCreate');
|
||||
|
||||
// #4 - as spawn does not happen synchronously anymore, we wait for the signal once more
|
||||
assert.rejects(pool.run('while(true) {}'));
|
||||
await once(pool, 'workerCreate');
|
||||
|
||||
assert.strictEqual(pool.threads.length, 4);
|
||||
await pool.destroy();
|
||||
assert.strictEqual(pool.threads.length, 0);
|
||||
assert.strictEqual(counter, 4);
|
||||
});
|
||||
|
||||
test('low maxThreads sets minThreads', () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
maxThreads: 1
|
||||
});
|
||||
|
||||
assert.strictEqual(pool.threads.length, 1);
|
||||
assert.strictEqual(pool.options.minThreads, 1);
|
||||
assert.strictEqual(pool.options.maxThreads, 1);
|
||||
});
|
||||
|
||||
test('high minThreads sets maxThreads', {
|
||||
skip: cpus().length > 8
|
||||
}, () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
minThreads: 16
|
||||
});
|
||||
|
||||
assert.strictEqual(pool.threads.length, 16);
|
||||
assert.strictEqual(pool.options.minThreads, 16);
|
||||
assert.strictEqual(pool.options.maxThreads, 16);
|
||||
});
|
||||
|
||||
test('conflicting min/max threads is error', () => {
|
||||
assert.throws(() => new Piscina({
|
||||
minThreads: 16,
|
||||
maxThreads: 8
|
||||
}), /options.minThreads and options.maxThreads must not conflict/);
|
||||
});
|
||||
|
||||
test('thread count should be 0 upon destruction', async () => {
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
minThreads: 2,
|
||||
maxThreads: 4
|
||||
});
|
||||
assert.strictEqual(pool.threads.length, 2);
|
||||
await pool.destroy();
|
||||
assert.strictEqual(pool.threads.length, 0);
|
||||
});
|
||||
+54
@@ -0,0 +1,54 @@
|
||||
import assert from 'node:assert';
|
||||
import { test } from 'node:test';
|
||||
import { resolve } from 'node:path';
|
||||
import Piscina from '../dist';
|
||||
|
||||
test('workers are marked as destroyed if destroyed', async () => {
|
||||
let index = 0;
|
||||
// Its expected to have one task get balanced twice due to the load balancer distribution
|
||||
// first task enters, its distributed; second is enqueued, once first is done, second is distributed and normalizes
|
||||
let workersFirstRound = [];
|
||||
let workersSecondRound = [];
|
||||
const pool = new Piscina({
|
||||
filename: resolve(__dirname, 'fixtures/eval.js'),
|
||||
minThreads: 2,
|
||||
maxThreads: 2,
|
||||
concurrentTasksPerWorker: 1,
|
||||
loadBalancer (_task, workers) {
|
||||
if (workersFirstRound.length === 0) {
|
||||
workersFirstRound = workers;
|
||||
workersSecondRound = workers;
|
||||
} else if (
|
||||
workersFirstRound[0].id !== workers[0].id
|
||||
) {
|
||||
workersSecondRound = workers;
|
||||
}
|
||||
// Verify distribution to properly test this feature
|
||||
const candidate = workers[index++ % workers.length];
|
||||
|
||||
if (candidate.currentUsage !== 0 && !candidate.isRunningAbortableTask) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return candidate;
|
||||
}
|
||||
});
|
||||
|
||||
const tasks = [];
|
||||
const controller = new AbortController();
|
||||
const signal = controller.signal;
|
||||
tasks.push(pool.run('while (true) {}', {
|
||||
signal
|
||||
}));
|
||||
|
||||
for (let n = 0; n < 5; n++) {
|
||||
tasks.push(pool.run('new Promise(resolve => setTimeout(resolve, 5))'));
|
||||
}
|
||||
|
||||
controller.abort();
|
||||
await Promise.allSettled(tasks);
|
||||
assert.notStrictEqual(workersFirstRound, workersSecondRound);
|
||||
assert.strictEqual(workersFirstRound.length, 2);
|
||||
assert.ok(workersFirstRound[0].destroyed);
|
||||
assert.ok(!workersFirstRound[0].terminating);
|
||||
});
|
||||
Reference in New Issue
Block a user