From ae9e15716bfb50b931b268e901ade36a3269df3f Mon Sep 17 00:00:00 2001 From: SheetJS Date: Wed, 14 Dec 2022 06:46:23 -0500 Subject: [PATCH] worker_threads --- .../03-demos/{35-server.md => 09-server.md} | 149 +++++++++++++++++- docz/static/server/worker_pool.js | 88 +++++++++++ 2 files changed, 233 insertions(+), 4 deletions(-) rename docz/docs/03-demos/{35-server.md => 09-server.md} (80%) create mode 100644 docz/static/server/worker_pool.js diff --git a/docz/docs/03-demos/35-server.md b/docz/docs/03-demos/09-server.md similarity index 80% rename from docz/docs/03-demos/35-server.md rename to docz/docs/03-demos/09-server.md index 2417ea2..38797bd 100644 --- a/docz/docs/03-demos/35-server.md +++ b/docz/docs/03-demos/09-server.md @@ -75,7 +75,7 @@ This breaks web frameworks that use the filesystem in body parsing. Deno provides the basic elements to implement a server. It does not provide a body parser out of the box. -### Drash +#### Drash In testing, [Drash](https://drash.land/drash/) had an in-memory body parser which could handle file uploads on hosted services like Deno Deploy. @@ -223,7 +223,15 @@ The page should show the contents of the file as an HTML table. ## NodeJS -### Express +When processing small files, the work is best handled in the server response +handler function. This approach is used in the "Framework Demos" section. + +When processing large files, the direct approach will freeze the server. NodeJS +provides "Worker Threads" for this exact use case. + +### Framework Demos + +#### Express The `express-formidable` middleware is powered by the `formidable` parser. It adds a `files` property to the request. @@ -304,7 +312,7 @@ It should prompt to download `SheetJSExpress.xlsx` -### NestJS +#### NestJS [The NestJS docs](https://docs.nestjs.com/techniques/file-upload) have detailed instructions for file upload support. In the controller, the `path` property @@ -413,7 +421,7 @@ It should prompt to download `SheetJSNest.xlsx` -### Fastify +#### Fastify :::note @@ -552,3 +560,136 @@ The response should show the data in CSV rows. It should prompt to download `SheetJSFastify.xlsx` + +### Worker Threads + +NodeJS "Worker Threads" were introduced in v14 and eventually marked as stable +in v16. Coupled with `AsyncResource`, a simple thread pool enables processing +without blocking the server! The official NodeJS docs include a sample worker +pool implementation. + +This example uses ExpressJS to create a general XLSX conversion service, but +the same approach applies to any NodeJS server side framework. + +When reading large files, it is strongly recommended to run the body parser in +the main server process. Body parsers like `formidable` will write uploaded +files to the filesystem, and the file path should be passed to the worker (and +the worker would be responsible for reading and cleaning up the files). + +:::note + +The `child_process` module can also spawn [command-line tools](/docs/demos/cli). +That approach is not explored in this demo. + +::: + +
Complete Example (click to show) + +:::note + +This demo was verified on 2022 December 14 + +::: + +0) Create a simple ECMAScript-Module-enabled `package.json`: + +```json title="package.json" +{ "type": "module" } +``` + +1) Install the dependencies: + +```bash +npm i --save https://cdn.sheetjs.com/xlsx-latest/xlsx-latest.tgz express@4.18.2 formidable@2.1.1 +``` + +2) Create a worker script `worker.js` that listens for messages. When a message +is received, it will read the file from the filesystem, generate and pass back a +new XLSX file, and delete the original file: + +```js title="worker.js" +/* load the worker_threads module */ +import { parentPort } from 'node:worker_threads'; + +/* load the SheetJS module and hook to FS */ +import { set_fs, readFile, write } from 'xlsx'; +import * as fs from 'fs'; +set_fs(fs); + +/* the server will send a message with the `path` field */ +parentPort.on('message', (task) => { + /* highlight-start */ + // read file + const wb = readFile(task.path, { dense: true }); + // send back XLSX + parentPort.postMessage(write(wb, { type: "buffer", bookType: "xlsx" })); + /* highlight-end */ + // remove file + fs.unlink(task.path, ()=>{}); +}); +``` + +3) Download [`worker_pool.js`](pathname:///server/worker_pool.js): + +```bash +curl -LO https://docs.sheetjs.com/server/worker_pool.js +``` + +(this is a slightly modified version of the example in the NodeJS docs) + +4) Save the following server code to `main.mjs`: + +```js title="main.mjs" +/* load dependencies */ +import os from 'node:os'; +import process from 'node:process' +import express from 'express'; +import formidable from 'formidable'; + +/* load worker pool */ +import WorkerPool from './worker_pool.js'; + +const pool = new WorkerPool(os.cpus().length); +process.on("beforeExit", () => { pool.close(); }) + +/* create server */ +const app = express(); +app.post('/', (req, res, next) => { + // parse body + const form = formidable({}); + form.parse(req, (err, fields, files) => { + // look for "upload" field + if(err) return next(err); + if(!files["upload"]) return next(new Error("missing `upload` file")); + + // send a message to the worker with the path to the uploaded file + // highlight-next-line + pool.runTask({ path: files["upload"].filepath }, (err, result) => { + if(err) return next(err); + // send the file back as an attachment + res.attachment("SheetJSPool.xlsx"); + res.status(200).end(result); + }); + }); +}); + +// start server +app.listen(7262, () => { console.log(`Example app listening on port 7262`); }); +``` + +5) Run the server: + +```bash +node main.mjs +``` + +Test with the [`pres.numbers` sample file](https://sheetjs.com/pres.numbers): + +```bash +curl -LO https://sheetjs.com/pres.numbers +curl -X POST -F upload=@pres.numbers http://localhost:7262/ -J -O +``` + +This will generate `SheetJSPool.xlsx`. + +
diff --git a/docz/static/server/worker_pool.js b/docz/static/server/worker_pool.js new file mode 100644 index 0000000..88d33a2 --- /dev/null +++ b/docz/static/server/worker_pool.js @@ -0,0 +1,88 @@ +// This example from https://nodejs.org/dist/latest/docs/api/worker_threads.html +// Documentation code redistributed under the MIT license. +// Copyright Node.js contributors + +import { AsyncResource } from 'node:async_hooks'; +import { EventEmitter } from 'node:events'; +import { Worker } from 'node:worker_threads'; + +const kTaskInfo = Symbol('kTaskInfo'); +const kWorkerFreedEvent = Symbol('kWorkerFreedEvent'); + +class WorkerPoolTaskInfo extends AsyncResource { + constructor(callback) { + super('WorkerPoolTaskInfo'); + this.callback = callback; + } + + done(err, result) { + this.runInAsyncScope(this.callback, null, err, result); + this.emitDestroy(); // `TaskInfo`s are used only once. + } +} + +export default class WorkerPool extends EventEmitter { + constructor(numThreads) { + super(); + this.numThreads = numThreads; + this.workers = []; + this.freeWorkers = []; + this.tasks = []; + + for (let i = 0; i < numThreads; i++) + this.addNewWorker(); + + // Any time the kWorkerFreedEvent is emitted, dispatch + // the next task pending in the queue, if any. + this.on(kWorkerFreedEvent, () => { + if (this.tasks.length > 0) { + const { task, callback } = this.tasks.shift(); + this.runTask(task, callback); + } + }); + } + + addNewWorker() { + const worker = new Worker(new URL('worker.js', import.meta.url)); + worker.on('message', (result) => { + // In case of success: Call the callback that was passed to `runTask`, + // remove the `TaskInfo` associated with the Worker, and mark it as free + // again. + worker[kTaskInfo].done(null, result); + worker[kTaskInfo] = null; + this.freeWorkers.push(worker); + this.emit(kWorkerFreedEvent); + }); + worker.on('error', (err) => { + // In case of an uncaught exception: Call the callback that was passed to + // `runTask` with the error. + if (worker[kTaskInfo]) + worker[kTaskInfo].done(err, null); + else + this.emit('error', err); + // Remove the worker from the list and start a new Worker to replace the + // current one. + this.workers.splice(this.workers.indexOf(worker), 1); + this.addNewWorker(); + }); + this.workers.push(worker); + this.freeWorkers.push(worker); + this.emit(kWorkerFreedEvent); + } + + runTask(task, callback) { + if (this.freeWorkers.length === 0) { + // No free threads, wait until a worker thread becomes free. + this.tasks.push({ task, callback }); + return; + } + + const worker = this.freeWorkers.pop(); + worker[kTaskInfo] = new WorkerPoolTaskInfo(callback); + worker.postMessage(task); + } + + close() { + for (const worker of this.workers) worker.terminate(); + } +}