worker_threads

This commit is contained in:
SheetJS 2022-12-14 06:46:23 -05:00
parent 292e61d647
commit ae9e15716b
2 changed files with 233 additions and 4 deletions

@ -75,7 +75,7 @@ This breaks web frameworks that use the filesystem in body parsing.
Deno provides the basic elements to implement a server. It does not provide a
body parser out of the box.
### Drash
#### Drash
In testing, [Drash](https://drash.land/drash/) had an in-memory body parser
which could handle file uploads on hosted services like Deno Deploy.
@ -223,7 +223,15 @@ The page should show the contents of the file as an HTML table.
## NodeJS
### Express
When processing small files, the work is best handled in the server response
handler function. This approach is used in the "Framework Demos" section.
When processing large files, the direct approach will freeze the server. NodeJS
provides "Worker Threads" for this exact use case.
### Framework Demos
#### Express
The `express-formidable` middleware is powered by the `formidable` parser. It
adds a `files` property to the request.
@ -304,7 +312,7 @@ It should prompt to download `SheetJSExpress.xlsx`
</details>
### NestJS
#### NestJS
[The NestJS docs](https://docs.nestjs.com/techniques/file-upload) have detailed
instructions for file upload support. In the controller, the `path` property
@ -413,7 +421,7 @@ It should prompt to download `SheetJSNest.xlsx`
</details>
### Fastify
#### Fastify
:::note
@ -552,3 +560,136 @@ The response should show the data in CSV rows.
It should prompt to download `SheetJSFastify.xlsx`
</details>
### Worker Threads
NodeJS "Worker Threads" were introduced in v14 and eventually marked as stable
in v16. Coupled with `AsyncResource`, a simple thread pool enables processing
without blocking the server! The official NodeJS docs include a sample worker
pool implementation.
This example uses ExpressJS to create a general XLSX conversion service, but
the same approach applies to any NodeJS server side framework.
When reading large files, it is strongly recommended to run the body parser in
the main server process. Body parsers like `formidable` will write uploaded
files to the filesystem, and the file path should be passed to the worker (and
the worker would be responsible for reading and cleaning up the files).
:::note
The `child_process` module can also spawn [command-line tools](/docs/demos/cli).
That approach is not explored in this demo.
:::
<details><summary><b>Complete Example</b> (click to show)</summary>
:::note
This demo was verified on 2022 December 14
:::
0) Create a simple ECMAScript-Module-enabled `package.json`:
```json title="package.json"
{ "type": "module" }
```
1) Install the dependencies:
```bash
npm i --save https://cdn.sheetjs.com/xlsx-latest/xlsx-latest.tgz express@4.18.2 formidable@2.1.1
```
2) Create a worker script `worker.js` that listens for messages. When a message
is received, it will read the file from the filesystem, generate and pass back a
new XLSX file, and delete the original file:
```js title="worker.js"
/* load the worker_threads module */
import { parentPort } from 'node:worker_threads';
/* load the SheetJS module and hook to FS */
import { set_fs, readFile, write } from 'xlsx';
import * as fs from 'fs';
set_fs(fs);
/* the server will send a message with the `path` field */
parentPort.on('message', (task) => {
/* highlight-start */
// read file
const wb = readFile(task.path, { dense: true });
// send back XLSX
parentPort.postMessage(write(wb, { type: "buffer", bookType: "xlsx" }));
/* highlight-end */
// remove file
fs.unlink(task.path, ()=>{});
});
```
3) Download [`worker_pool.js`](pathname:///server/worker_pool.js):
```bash
curl -LO https://docs.sheetjs.com/server/worker_pool.js
```
(this is a slightly modified version of the example in the NodeJS docs)
4) Save the following server code to `main.mjs`:
```js title="main.mjs"
/* load dependencies */
import os from 'node:os';
import process from 'node:process'
import express from 'express';
import formidable from 'formidable';
/* load worker pool */
import WorkerPool from './worker_pool.js';
const pool = new WorkerPool(os.cpus().length);
process.on("beforeExit", () => { pool.close(); })
/* create server */
const app = express();
app.post('/', (req, res, next) => {
// parse body
const form = formidable({});
form.parse(req, (err, fields, files) => {
// look for "upload" field
if(err) return next(err);
if(!files["upload"]) return next(new Error("missing `upload` file"));
// send a message to the worker with the path to the uploaded file
// highlight-next-line
pool.runTask({ path: files["upload"].filepath }, (err, result) => {
if(err) return next(err);
// send the file back as an attachment
res.attachment("SheetJSPool.xlsx");
res.status(200).end(result);
});
});
});
// start server
app.listen(7262, () => { console.log(`Example app listening on port 7262`); });
```
5) Run the server:
```bash
node main.mjs
```
Test with the [`pres.numbers` sample file](https://sheetjs.com/pres.numbers):
```bash
curl -LO https://sheetjs.com/pres.numbers
curl -X POST -F upload=@pres.numbers http://localhost:7262/ -J -O
```
This will generate `SheetJSPool.xlsx`.
</details>

@ -0,0 +1,88 @@
// This example from https://nodejs.org/dist/latest/docs/api/worker_threads.html
// Documentation code redistributed under the MIT license.
// Copyright Node.js contributors
import { AsyncResource } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import { Worker } from 'node:worker_threads';
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
export default class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(new URL('worker.js', import.meta.url));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}