Best practice for streaming to HTTP clients? #2834

Closed
opened 2022-12-01 22:23:59 +00:00 by coolaj86 · 3 comments

My understanding is that the entire workbook buffer is always created in memory due to the forward pointers and such. Is that correct?

If so, it sounds like the best streaming strategy would actually to be to write quickly to the fastest medium - likely local files (or maybe s3), NOT the network - and then stream from that to the slow clients.

Something like this:

'use strict';

let Fs = require('node:fs/promises');
let FsStream = require('node:fs');
let Path = require('node:path');

let XLSX = require('sheetjs');

app.use('/api/sheets/:sheet_id.xlxs', async function (req, res) {
  let friendlyName = req.params.sheet_id;
  
  let tmpDir = './sheets/';
  await Fs.mkdir(tmpDir, { recursive: true });
  
  let rndName = Crypto.randomBytes(8).toString('hex');
  let now = Date.now();
  let tmpPath = Path.join(tmpDir, `${now}-${rnd}.xlsx`);
  
  let buf = XLSX.writeFile(wb, fliendlyName, {
    type: 'buffer',
  })
  
  await Fs.writeFile(tmpPath, buf);
  let rs = FsStream.createReadStream(tmpPath);
  
  rs.once('error', cleanup);
  rs.once('close', cleanup);
  
  res.once('error', cleanup);
  res.once('close', cleanup);
  
  rs.pipe(res);
  
  async function cleanup(err) {
    if (err) {
      console.error(`${friendlyName}: ${err.message}`);
    }
    
    await Fs.unlink(tmpPath).catch(function (err) {
      if ('ENOENT' !== err.code) {
        console.error(`${tmpPath}: ${err.message}`);
      }
    });
    
    if (!rs.closed) {
      rs.close();
    }
    if (!res.writableEnded) {
      res.end();
    }
  }
})

Do you know of any nuance I should be aware of or have any other thoughts on that?

My understanding is that the entire workbook buffer is always created in memory due to the forward pointers and such. Is that correct? If so, it sounds like the best streaming strategy would actually to be to write quickly to the fastest medium - likely local files (or maybe s3), NOT the network - and then stream from that to the slow clients. Something like this: ```js 'use strict'; let Fs = require('node:fs/promises'); let FsStream = require('node:fs'); let Path = require('node:path'); let XLSX = require('sheetjs'); app.use('/api/sheets/:sheet_id.xlxs', async function (req, res) { let friendlyName = req.params.sheet_id; let tmpDir = './sheets/'; await Fs.mkdir(tmpDir, { recursive: true }); let rndName = Crypto.randomBytes(8).toString('hex'); let now = Date.now(); let tmpPath = Path.join(tmpDir, `${now}-${rnd}.xlsx`); let buf = XLSX.writeFile(wb, fliendlyName, { type: 'buffer', }) await Fs.writeFile(tmpPath, buf); let rs = FsStream.createReadStream(tmpPath); rs.once('error', cleanup); rs.once('close', cleanup); res.once('error', cleanup); res.once('close', cleanup); rs.pipe(res); async function cleanup(err) { if (err) { console.error(`${friendlyName}: ${err.message}`); } await Fs.unlink(tmpPath).catch(function (err) { if ('ENOENT' !== err.code) { console.error(`${tmpPath}: ${err.message}`); } }); if (!rs.closed) { rs.close(); } if (!res.writableEnded) { res.end(); } } }) ``` Do you know of any nuance I should be aware of or have any other thoughts on that?
Owner

To avoid blocking the main thread, the best approach for modern NodeJS is worker threads, probably using some sort of thread pool.

The idea is to use writeFile in the worker and pass the path back to the API route callback. The rest is similar to how you have laid out the code.

We probably should make a demo of this approach.

.

If you are comfortable changing the API design and are exporting large files, you can make the server respond with a random name and add a separate route for downloading exports given the name. Typically you would cleanup exports after X hours or days. This is spiritually similar to how services like Twitter make exports of personal data.

.

Under the hood, writeFile assembles the file in memory and writes. This approach "just works" in legacy and modern browsers, NodeJS and a number of other runtimes.

We've shied away from streaming XLSX write in the past since the current state of browser streaming is pretty abysmal. In case you are curious, https://docs.sheetjs.com/docs/demos/worker#file-system-access-api shows XLSX -> CSV conversion. Locally, against the file with 300K rows, writing at once takes ~2 sec while committing each row takes ~112 sec in Chrome 106.

If streaming XLSX write in NodeJS is of interest, we can take a closer look. The issues about forward references apply to streaming parse (XLSX is a ZIP-based file format and the "table of contents" is stored at the end of the file)

To avoid blocking the main thread, the best approach for modern NodeJS is worker threads, probably using some sort of [thread pool](https://nodejs.org/api/async_context.html#using-asyncresource-for-a-worker-thread-pool). The idea is to use `writeFile` in the worker and pass the path back to the API route callback. The rest is similar to how you have laid out the code. We probably should make a demo of this approach. . If you are comfortable changing the API design and are exporting large files, you can make the server respond with a random name and add a separate route for downloading exports given the name. Typically you would cleanup exports after X hours or days. This is spiritually similar to how services like Twitter make exports of personal data. . Under the hood, `writeFile` assembles the file in memory and writes. This approach "just works" in legacy and modern browsers, NodeJS and a number of other runtimes. We've shied away from streaming XLSX write in the past since the current state of browser streaming is pretty abysmal. In case you are curious, https://docs.sheetjs.com/docs/demos/worker#file-system-access-api shows XLSX -> CSV conversion. Locally, against the file with 300K rows, writing at once takes ~2 sec while committing each row takes ~112 sec in Chrome 106. If streaming XLSX write in NodeJS is of interest, we can take a closer look. The issues about forward references apply to streaming parse (XLSX is a ZIP-based file format and the "table of contents" is stored at the end of the file)
Author

Re: streaming

My concern was more having memory backing up, but I can see what you're saying about using a lot of CPU at once and blocking the main thread.

Re: thread pool

Honestly, that thread pool stuff is way too complicated. Node just isn't the right tool for the job when you need multi-threading, IMO.

I'd recommend anyone just write a little API microservice instead and call out to it.

(we do this for LibreOffice pdf conversion https://github.com/savvi-legal/libreoffice-as-a-service/blob/main/routes/convert.js - though... I used it as an excuse to try out fastify, so it's probably equally complicated 🫤...)

Thanks

Great detail and suggestions. I appreciate it. :)

### Re: streaming My concern was more having memory backing up, but I can see what you're saying about using a lot of CPU at once and blocking the main thread. ### Re: thread pool Honestly, that thread pool stuff is way too complicated. Node just isn't the right tool for the job when you need multi-threading, IMO. I'd recommend anyone just write a little API microservice instead and call out to it. (we do this for LibreOffice pdf conversion https://github.com/savvi-legal/libreoffice-as-a-service/blob/main/routes/convert.js - though... I used it as an excuse to try out `fastify`, so it's probably equally complicated 🫤...) ### Thanks Great detail and suggestions. I appreciate it. :)
Owner

Mirroring the soffice example, you can write a NodeJS script and shell-out (https://docs.sheetjs.com/docs/demos/cli a similar script was in our first release back in 2012!). You can write your CLI script to query the backend for data and write the file.

.

To show worker threads, here's a small XLSX conversion service powered by ExpressJS. It will accept file uploads, parse the data and send back the converted data in the XLSX format

  1. create package.json
echo '{ "type": "module" }' > package.json

install the SheetJS library, express, and formidable:

npm i --save https://cdn.sheetjs.com/xlsx-0.19.1/xlsx-0.19.1.tgz express@4.x formidable@2.x
  1. write the following to task_processor.js:
import { parentPort } from 'node:worker_threads';
import { set_fs, readFile, write } from 'xlsx';
import * as fs from 'fs';
set_fs(fs);

parentPort.on('message', (task) => {
  // read file
  const wb = readFile(task.path);
  // send back XLSX
  parentPort.postMessage(write(wb, { type: "buffer", bookType: "xlsx" }));
  // remove file
  fs.unlink(task.path, ()=>{});
});
  1. copy the entire worker pool code from the docs page to worker_pool.js.
Code (click to show)
import { AsyncResource } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import path from 'node:path';
import { Worker } from 'node:worker_threads';

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo');
    this.callback = callback;
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result);
    this.emitDestroy();  // `TaskInfo`s are used only once.
  }
}

export default class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super();
    this.numThreads = numThreads;
    this.workers = [];
    this.freeWorkers = [];
    this.tasks = [];

    for (let i = 0; i < numThreads; i++)
      this.addNewWorker();

    // Any time the kWorkerFreedEvent is emitted, dispatch
    // the next task pending in the queue, if any.
    this.on(kWorkerFreedEvent, () => {
      if (this.tasks.length > 0) {
        const { task, callback } = this.tasks.shift();
        this.runTask(task, callback);
      }
    });
  }

  addNewWorker() {
    const worker = new Worker(new URL('task_processor.js', import.meta.url));
    worker.on('message', (result) => {
      // In case of success: Call the callback that was passed to `runTask`,
      // remove the `TaskInfo` associated with the Worker, and mark it as free
      // again.
      worker[kTaskInfo].done(null, result);
      worker[kTaskInfo] = null;
      this.freeWorkers.push(worker);
      this.emit(kWorkerFreedEvent);
    });
    worker.on('error', (err) => {
      // In case of an uncaught exception: Call the callback that was passed to
      // `runTask` with the error.
      if (worker[kTaskInfo])
        worker[kTaskInfo].done(err, null);
      else
        this.emit('error', err);
      // Remove the worker from the list and start a new Worker to replace the
      // current one.
      this.workers.splice(this.workers.indexOf(worker), 1);
      this.addNewWorker();
    });
    this.workers.push(worker);
    this.freeWorkers.push(worker);
    this.emit(kWorkerFreedEvent);
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // No free threads, wait until a worker thread becomes free.
      this.tasks.push({ task, callback });
      return;
    }

    const worker = this.freeWorkers.pop();
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
    worker.postMessage(task);
  }

  close() {
    for (const worker of this.workers) worker.terminate();
  }
}
  1. define server (save to main.mjs):
import WorkerPool from './worker_pool.js';
import os from 'node:os';
import process from 'node:process'
import express from 'express';
import formidable from 'formidable';

// define pool
const pool = new WorkerPool(os.cpus().length);
process.on("beforeExit", () => { pool.close(); })

// express server
const app = express();

// endpoint
app.post('/', (req, res, next) => {
  // parse body
  const form = formidable({});
  form.parse(req, (err, fields, files) => {
    // look for "upload" field
    if(err) return next(err);
    if(!files["upload"]) return next(); // or some error check
    // convert
    pool.runTask({path: files["upload"].filepath}, (err, result) => {
      // send back file
      if(err) return next(err);
      res.attachment("SheetJSPool.xlsx");
      res.status(200).end(result);
    });
  });
});

// start server
app.listen(7262, () => { console.log(`Example app listening on port 7262`); });
  1. start server:
node main.mjs
  1. test by uploading pres.numbers:
curl -LO https://sheetjs.com/pres.numbers
curl -X POST -F upload=@pres.numbers http://localhost:7262/ -J -O

This will generate SheetJSPool.xlsx which you can open.

Mirroring the soffice example, you can write a NodeJS script and shell-out (https://docs.sheetjs.com/docs/demos/cli a similar script was in our first release back in 2012!). You can write your CLI script to query the backend for data and write the file. . To show worker threads, here's a small XLSX conversion service powered by ExpressJS. It will accept file uploads, parse the data and send back the converted data in the XLSX format 0) create package.json ```bash echo '{ "type": "module" }' > package.json ``` install the SheetJS library, express, and formidable: ```bash npm i --save https://cdn.sheetjs.com/xlsx-0.19.1/xlsx-0.19.1.tgz express@4.x formidable@2.x ``` 1) write the following to `task_processor.js`: ```js import { parentPort } from 'node:worker_threads'; import { set_fs, readFile, write } from 'xlsx'; import * as fs from 'fs'; set_fs(fs); parentPort.on('message', (task) => { // read file const wb = readFile(task.path); // send back XLSX parentPort.postMessage(write(wb, { type: "buffer", bookType: "xlsx" })); // remove file fs.unlink(task.path, ()=>{}); }); ``` 2) copy the entire worker pool code from the docs page to `worker_pool.js`. <details><summary><b>Code</b> (click to show)</summary> ```js import { AsyncResource } from 'node:async_hooks'; import { EventEmitter } from 'node:events'; import path from 'node:path'; import { Worker } from 'node:worker_threads'; const kTaskInfo = Symbol('kTaskInfo'); const kWorkerFreedEvent = Symbol('kWorkerFreedEvent'); class WorkerPoolTaskInfo extends AsyncResource { constructor(callback) { super('WorkerPoolTaskInfo'); this.callback = callback; } done(err, result) { this.runInAsyncScope(this.callback, null, err, result); this.emitDestroy(); // `TaskInfo`s are used only once. } } export default class WorkerPool extends EventEmitter { constructor(numThreads) { super(); this.numThreads = numThreads; this.workers = []; this.freeWorkers = []; this.tasks = []; for (let i = 0; i < numThreads; i++) this.addNewWorker(); // Any time the kWorkerFreedEvent is emitted, dispatch // the next task pending in the queue, if any. this.on(kWorkerFreedEvent, () => { if (this.tasks.length > 0) { const { task, callback } = this.tasks.shift(); this.runTask(task, callback); } }); } addNewWorker() { const worker = new Worker(new URL('task_processor.js', import.meta.url)); worker.on('message', (result) => { // In case of success: Call the callback that was passed to `runTask`, // remove the `TaskInfo` associated with the Worker, and mark it as free // again. worker[kTaskInfo].done(null, result); worker[kTaskInfo] = null; this.freeWorkers.push(worker); this.emit(kWorkerFreedEvent); }); worker.on('error', (err) => { // In case of an uncaught exception: Call the callback that was passed to // `runTask` with the error. if (worker[kTaskInfo]) worker[kTaskInfo].done(err, null); else this.emit('error', err); // Remove the worker from the list and start a new Worker to replace the // current one. this.workers.splice(this.workers.indexOf(worker), 1); this.addNewWorker(); }); this.workers.push(worker); this.freeWorkers.push(worker); this.emit(kWorkerFreedEvent); } runTask(task, callback) { if (this.freeWorkers.length === 0) { // No free threads, wait until a worker thread becomes free. this.tasks.push({ task, callback }); return; } const worker = this.freeWorkers.pop(); worker[kTaskInfo] = new WorkerPoolTaskInfo(callback); worker.postMessage(task); } close() { for (const worker of this.workers) worker.terminate(); } } ``` </details> 3) define server (save to `main.mjs`): ```js import WorkerPool from './worker_pool.js'; import os from 'node:os'; import process from 'node:process' import express from 'express'; import formidable from 'formidable'; // define pool const pool = new WorkerPool(os.cpus().length); process.on("beforeExit", () => { pool.close(); }) // express server const app = express(); // endpoint app.post('/', (req, res, next) => { // parse body const form = formidable({}); form.parse(req, (err, fields, files) => { // look for "upload" field if(err) return next(err); if(!files["upload"]) return next(); // or some error check // convert pool.runTask({path: files["upload"].filepath}, (err, result) => { // send back file if(err) return next(err); res.attachment("SheetJSPool.xlsx"); res.status(200).end(result); }); }); }); // start server app.listen(7262, () => { console.log(`Example app listening on port 7262`); }); ``` 4) start server: ```bash node main.mjs ``` 5) test by uploading `pres.numbers`: ```bash curl -LO https://sheetjs.com/pres.numbers curl -X POST -F upload=@pres.numbers http://localhost:7262/ -J -O ``` This will generate `SheetJSPool.xlsx` which you can open.
Sign in to join this conversation.
No Milestone
No Assignees
2 Participants
Notifications
Due Date
The due date is invalid or out of range. Please use the format 'yyyy-mm-dd'.

No due date set.

Dependencies

No dependencies set.

Reference: sheetjs/sheetjs#2834
No description provided.