From 3a8cae2bba9500454ba394e21ed9af21f5c0c320 Mon Sep 17 00:00:00 2001 From: avivkeller Date: Thu, 10 Apr 2025 11:38:58 -0400 Subject: [PATCH 1/7] feat(perf): offload generators to worker threads --- bin/cli.mjs | 9 ++ src/generators.mjs | 105 +++++++++++++++++- src/generators/json-simple/index.mjs | 10 -- src/generators/legacy-html-all/index.mjs | 2 +- src/generators/legacy-html/index.mjs | 3 +- .../legacy-html/utils/buildDropdowns.mjs | 3 +- .../legacy-json/utils/buildSection.mjs | 2 +- src/linter/tests/fixtures/entries.mjs | 9 -- src/metadata.mjs | 11 -- src/test/metadata.test.mjs | 12 +- src/types.d.ts | 13 +-- 11 files changed, 125 insertions(+), 54 deletions(-) diff --git a/bin/cli.mjs b/bin/cli.mjs index 27c3d069..134730db 100755 --- a/bin/cli.mjs +++ b/bin/cli.mjs @@ -77,6 +77,12 @@ program .choices(Object.keys(reporters)) .default('console') ) + .addOption( + new Option( + '--disable-parallelism', + 'Disable the use of multiple threads' + ).default(false) + ) .parse(process.argv); /** @@ -108,6 +114,7 @@ const { lintDryRun, gitRef, reporter, + disableParallelism, } = program.opts(); const linter = createLinter(lintDryRun, disableRule); @@ -142,6 +149,8 @@ if (target) { // An URL containing a git ref URL pointing to the commit or ref that was used // to generate the API docs. This is used to link to the source code of the gitRef, + // Disable the use of parallel threads + disableParallelism, }); } diff --git a/src/generators.mjs b/src/generators.mjs index bb03c290..68241b46 100644 --- a/src/generators.mjs +++ b/src/generators.mjs @@ -1,5 +1,8 @@ 'use strict'; +import { Worker, isMainThread, parentPort, workerData } from 'worker_threads'; +import os from 'os'; + import publicGenerators from './generators/index.mjs'; import astJs from './generators/ast-js/index.mjs'; import oramaDb from './generators/orama-db/index.mjs'; @@ -12,6 +15,25 @@ const availableGenerators = { 'orama-db': oramaDb, }; +// Thread pool max limit +const MAX_THREADS = Math.max(1, os.cpus().length - 1); + +// If inside a worker thread, perform the generator logic here +if (!isMainThread) { + const { name, dependencyOutput, extra } = workerData; + const generator = availableGenerators[name]; + + // Execute the generator and send the result back to the parent thread + generator + .generate(dependencyOutput, extra) + .then(result => { + parentPort.postMessage(result); + }) + .catch(error => { + parentPort.postMessage({ error }); + }); +} + /** * @typedef {{ ast: GeneratorMetadata}} AstGenerator The AST "generator" is a facade for the AST tree and it isn't really a generator * @typedef {AvailableGenerators & AstGenerator} AllGenerators A complete set of the available generators, including the AST one @@ -43,22 +65,92 @@ const createGenerator = markdownInput => { */ const cachedGenerators = { ast: Promise.resolve(markdownInput) }; + // Keep track of how many threads are currently running + let activeThreads = 0; + const threadQueue = []; + + /** + * + * @param name + * @param dependencyOutput + * @param extra + */ + const runInWorker = (name, dependencyOutput, extra) => { + return new Promise((resolve, reject) => { + /** + * + */ + const run = () => { + activeThreads++; + + const worker = new Worker(new URL(import.meta.url), { + workerData: { name, dependencyOutput, extra }, + }); + + worker.on('message', result => { + activeThreads--; + processQueue(); + + if (result && result.error) { + reject(result.error); + } else { + resolve(result); + } + }); + + worker.on('error', err => { + activeThreads--; + processQueue(); + reject(err); + }); + }; + + if (activeThreads >= MAX_THREADS) { + threadQueue.push(run); + } else { + run(); + } + }); + }; + + /** + * + */ + const processQueue = () => { + if (threadQueue.length > 0 && activeThreads < MAX_THREADS) { + const next = threadQueue.shift(); + next(); + } + }; + /** * Runs the Generator engine with the provided top-level input and the given generator options * * @param {GeneratorOptions} options The options for the generator runtime */ - const runGenerators = async ({ generators, ...extra }) => { + const runGenerators = async ({ + generators, + disableParallelism = false, + ...extra + }) => { // Note that this method is blocking, and will only execute one generator per-time // but it ensures all dependencies are resolved, and that multiple bottom-level generators // can reuse the already parsed content from the top-level/dependency generators for (const generatorName of generators) { - const { dependsOn, generate } = availableGenerators[generatorName]; + const { + dependsOn, + generate, + parallizable = true, + } = availableGenerators[generatorName]; // If the generator dependency has not yet been resolved, we resolve // the dependency first before running the current generator - if (dependsOn && dependsOn in cachedGenerators === false) { - await runGenerators({ ...extra, generators: [dependsOn] }); + if (dependsOn && !(dependsOn in cachedGenerators)) { + await runGenerators({ + ...extra, + disableParallelism, + generators: [dependsOn], + }); } // Ensures that the dependency output gets resolved before we run the current @@ -66,7 +158,10 @@ const createGenerator = markdownInput => { const dependencyOutput = await cachedGenerators[dependsOn]; // Adds the current generator execution Promise to the cache - cachedGenerators[generatorName] = generate(dependencyOutput, extra); + cachedGenerators[generatorName] = + disableParallelism || !parallizable + ? generate(dependencyOutput, extra) // Run in main thread + : runInWorker(generatorName, dependencyOutput, extra); // Offload to worker thread } // Returns the value of the last generator of the current pipeline diff --git a/src/generators/json-simple/index.mjs b/src/generators/json-simple/index.mjs index c3797aa3..f23395c0 100644 --- a/src/generators/json-simple/index.mjs +++ b/src/generators/json-simple/index.mjs @@ -6,7 +6,6 @@ import { join } from 'node:path'; import { remove } from 'unist-util-remove'; import createQueries from '../../utils/queries/index.mjs'; -import { getRemark } from '../../utils/remark.mjs'; /** * This generator generates a simplified JSON version of the API docs and returns it as a string @@ -35,9 +34,6 @@ export default { * @param {Partial} options */ async generate(input, options) { - // Gets a remark processor for stringifying the AST tree into JSON - const remarkProcessor = getRemark(); - // Iterates the input (ApiDocMetadataEntry) and performs a few changes const mappedInput = input.map(node => { // Deep clones the content nodes to avoid affecting upstream nodes @@ -50,12 +46,6 @@ export default { createQueries.UNIST.isHeading, ]); - /** - * For the JSON generate we want to transform the whole content into JSON - * @returns {string} The stringified JSON version of the content - */ - content.toJSON = () => remarkProcessor.stringify(content); - return { ...node, content }; }); diff --git a/src/generators/legacy-html-all/index.mjs b/src/generators/legacy-html-all/index.mjs index 7004aa4f..77f8da7e 100644 --- a/src/generators/legacy-html-all/index.mjs +++ b/src/generators/legacy-html-all/index.mjs @@ -86,7 +86,7 @@ export default { .replace('__ID__', 'all') .replace(/__FILENAME__/g, 'all') .replace('__SECTION__', 'All') - .replace(/__VERSION__/g, `v${version.toString()}`) + .replace(/__VERSION__/g, `v${version.version}`) .replace(/__TOC__/g, tableOfContents.wrapToC(aggregatedToC)) .replace(/__GTOC__/g, parsedSideNav) .replace('__CONTENT__', aggregatedContent) diff --git a/src/generators/legacy-html/index.mjs b/src/generators/legacy-html/index.mjs index c7f836d7..77aefb6c 100644 --- a/src/generators/legacy-html/index.mjs +++ b/src/generators/legacy-html/index.mjs @@ -84,7 +84,6 @@ export default { */ const replaceTemplateValues = values => { const { api, added, section, version, toc, nav, content } = values; - return apiTemplate .replace('__ID__', api) .replace(/__FILENAME__/g, api) @@ -139,7 +138,7 @@ export default { api: head.api, added: head.introduced_in ?? '', section: head.heading.data.name || apiAsHeading, - version: `v${version.toString()}`, + version: `v${version.version}`, toc: String(parsedToC), nav: String(activeSideNav), content: parsedContent, diff --git a/src/generators/legacy-html/utils/buildDropdowns.mjs b/src/generators/legacy-html/utils/buildDropdowns.mjs index 01b76f78..cefa0f15 100644 --- a/src/generators/legacy-html/utils/buildDropdowns.mjs +++ b/src/generators/legacy-html/utils/buildDropdowns.mjs @@ -60,8 +60,9 @@ const buildNavigation = navigationContents => const buildVersions = (api, added, versions) => { // All Node.js versions that support the current API; If there's no "introduced_at" field, // we simply show all versions, as we cannot pinpoint the exact version + const coercedMajor = major(coerceSemVer(added)); const compatibleVersions = versions.filter(({ version }) => - added ? major(version) >= major(coerceSemVer(added)) : true + added ? version.major >= coercedMajor : true ); // Parses the SemVer version into something we use for URLs and to display the Node.js version diff --git a/src/generators/legacy-json/utils/buildSection.mjs b/src/generators/legacy-json/utils/buildSection.mjs index c284286b..62da17f8 100644 --- a/src/generators/legacy-json/utils/buildSection.mjs +++ b/src/generators/legacy-json/utils/buildSection.mjs @@ -58,7 +58,7 @@ export const createSectionBuilder = () => { * @param {import('../types.d.ts').HierarchizedEntry} entry - The entry providing stability information. */ const parseStability = (section, nodes, { stability }) => { - const stabilityInfo = stability.toJSON()?.[0]; + const stabilityInfo = stability.children.map(node => node.data); if (stabilityInfo) { section.stability = stabilityInfo.index; diff --git a/src/linter/tests/fixtures/entries.mjs b/src/linter/tests/fixtures/entries.mjs index 6b40892a..1319c18e 100644 --- a/src/linter/tests/fixtures/entries.mjs +++ b/src/linter/tests/fixtures/entries.mjs @@ -1,10 +1,3 @@ -/** - * Noop function. - * - * @returns {any} - */ -const noop = () => {}; - /** * @type {ApiDocMetadataEntry} */ @@ -69,12 +62,10 @@ export const assertEntry = { slug: 'assert', type: 'property', }, - toJSON: noop, }, stability: { type: 'root', children: [], - toJSON: noop, }, content: { type: 'root', diff --git a/src/metadata.mjs b/src/metadata.mjs index 8c30a80f..df954074 100644 --- a/src/metadata.mjs +++ b/src/metadata.mjs @@ -140,17 +140,6 @@ const createMetadata = slugger => { internalMetadata.heading.data.type = type ?? internalMetadata.heading.data.type; - /** - * Defines the toJSON method for the Heading AST node to be converted as JSON - */ - internalMetadata.heading.toJSON = () => internalMetadata.heading.data; - - /** - * Maps the Stability Index AST nodes into a JSON objects from their data properties - */ - internalMetadata.stability.toJSON = () => - internalMetadata.stability.children.map(node => node.data); - // Returns the Metadata entry for the API doc return { api: apiDoc.stem, diff --git a/src/test/metadata.test.mjs b/src/test/metadata.test.mjs index a36a99e9..eff0f9f8 100644 --- a/src/test/metadata.test.mjs +++ b/src/test/metadata.test.mjs @@ -33,7 +33,6 @@ describe('createMetadata', () => { }; metadata.addStability(stability); const actual = metadata.create(new VFile(), {}).stability; - delete actual.toJSON; deepStrictEqual(actual, { children: [stability], type: 'root', @@ -82,8 +81,15 @@ describe('createMetadata', () => { yaml_position: {}, }; const actual = metadata.create(apiDoc, section); - delete actual.stability.toJSON; - delete actual.heading.toJSON; deepStrictEqual(actual, expected); }); + + it('should be serializable', () => { + const { create } = createMetadata(new GitHubSlugger()); + const actual = create(new VFile({ path: 'test.md' }), { + type: 'root', + children: [], + }); + deepStrictEqual(structuredClone(actual), actual); + }); }); diff --git a/src/types.d.ts b/src/types.d.ts index e0aec952..7d513d43 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -3,12 +3,6 @@ import type { Program } from 'acorn'; import type { SemVer } from 'semver'; import type { Data, Node, Parent, Position } from 'unist'; -// String serialization of the AST tree -// @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON/stringify#tojson_behavior -interface WithJSON extends T { - toJSON: () => J; -} - // Unist Node with typed Data, which allows better type inference interface NodeWithData extends T { data: J; @@ -88,12 +82,9 @@ declare global { // Any changes to the API doc Metadata changes: Array; // The parsed Markdown content of a Navigation Entry - heading: WithJSON; + heading: HeadingMetadataParent; // The API doc metadata Entry Stability Index if exists - stability: WithJSON< - StabilityIndexParent, - Array - >; + stability: StabilityIndexParent; // The subtree containing all Nodes of the API doc entry content: Root; // Extra YAML section entries that are stringd and serve From 26b56dc5c8d90b43173ad707ab84c8221376d7a1 Mon Sep 17 00:00:00 2001 From: Aviv Keller Date: Thu, 10 Apr 2025 11:41:48 -0400 Subject: [PATCH 2/7] Fix potential undefined access in parseStability --- src/generators/legacy-json/utils/buildSection.mjs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/generators/legacy-json/utils/buildSection.mjs b/src/generators/legacy-json/utils/buildSection.mjs index 62da17f8..51dea85d 100644 --- a/src/generators/legacy-json/utils/buildSection.mjs +++ b/src/generators/legacy-json/utils/buildSection.mjs @@ -58,7 +58,7 @@ export const createSectionBuilder = () => { * @param {import('../types.d.ts').HierarchizedEntry} entry - The entry providing stability information. */ const parseStability = (section, nodes, { stability }) => { - const stabilityInfo = stability.children.map(node => node.data); + const stabilityInfo = stability.children.map(node => node.data)?.[0]; if (stabilityInfo) { section.stability = stabilityInfo.index; From 0da3e20fbf5dd8c829b6a3b42155b8c422a03856 Mon Sep 17 00:00:00 2001 From: avivkeller Date: Thu, 10 Apr 2025 11:45:38 -0400 Subject: [PATCH 3/7] add jsdoc --- src/generators.mjs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/generators.mjs b/src/generators.mjs index 68241b46..91f7c8c1 100644 --- a/src/generators.mjs +++ b/src/generators.mjs @@ -70,15 +70,15 @@ const createGenerator = markdownInput => { const threadQueue = []; /** - * - * @param name - * @param dependencyOutput - * @param extra + * Run the input generator within a worker thread + * @param {keyof AllGenerators} name + * @param {any} dependencyOutput + * @param {Partial} extra */ const runInWorker = (name, dependencyOutput, extra) => { return new Promise((resolve, reject) => { /** - * + * Run the generator */ const run = () => { activeThreads++; @@ -114,7 +114,7 @@ const createGenerator = markdownInput => { }; /** - * + * Process the worker thread queue */ const processQueue = () => { if (threadQueue.length > 0 && activeThreads < MAX_THREADS) { From 1c16d087c165a3be85b2c97031bcfc81d45223dd Mon Sep 17 00:00:00 2001 From: avivkeller Date: Thu, 10 Apr 2025 12:30:55 -0400 Subject: [PATCH 4/7] code review --- bin/cli.mjs | 19 +++---- src/generators.mjs | 107 +++----------------------------------- src/generators/index.mjs | 10 +++- src/generators/types.d.ts | 7 ++- src/threading.mjs | 92 ++++++++++++++++++++++++++++++++ 5 files changed, 124 insertions(+), 111 deletions(-) create mode 100644 src/threading.mjs diff --git a/bin/cli.mjs b/bin/cli.mjs index 134730db..c4248de9 100755 --- a/bin/cli.mjs +++ b/bin/cli.mjs @@ -2,13 +2,14 @@ import { resolve } from 'node:path'; import process from 'node:process'; +import { cpus } from 'node:os'; import { Command, Option } from 'commander'; import { coerce } from 'semver'; import { DOC_NODE_CHANGELOG_URL, DOC_NODE_VERSION } from '../src/constants.mjs'; import createGenerator from '../src/generators.mjs'; -import generators from '../src/generators/index.mjs'; +import { publicGenerators } from '../src/generators/index.mjs'; import createLinter from '../src/linter/index.mjs'; import reporters from '../src/linter/reporters/index.mjs'; import rules from '../src/linter/rules/index.mjs'; @@ -16,7 +17,7 @@ import createMarkdownLoader from '../src/loaders/markdown.mjs'; import createMarkdownParser from '../src/parsers/markdown.mjs'; import createNodeReleases from '../src/releases.mjs'; -const availableGenerators = Object.keys(generators); +const availableGenerators = Object.keys(publicGenerators); const program = new Command(); @@ -79,14 +80,14 @@ program ) .addOption( new Option( - '--disable-parallelism', - 'Disable the use of multiple threads' - ).default(false) + '-p, --threads ', + 'The maximum number of threads to use. Set to 1 to disable parallelism' + ).default(Math.max(1, cpus().length - 1)) ) .parse(process.argv); /** - * @typedef {keyof generators} Target A list of the available generator names. + * @typedef {keyof publicGenerators} Target A list of the available generator names. * * @typedef {Object} Options * @property {Array|string} input Specifies the glob/path for input files. @@ -114,7 +115,7 @@ const { lintDryRun, gitRef, reporter, - disableParallelism, + threads, } = program.opts(); const linter = createLinter(lintDryRun, disableRule); @@ -149,8 +150,8 @@ if (target) { // An URL containing a git ref URL pointing to the commit or ref that was used // to generate the API docs. This is used to link to the source code of the gitRef, - // Disable the use of parallel threads - disableParallelism, + // How many threads should be used + threads, }); } diff --git a/src/generators.mjs b/src/generators.mjs index 91f7c8c1..0afdcf5f 100644 --- a/src/generators.mjs +++ b/src/generators.mjs @@ -1,38 +1,7 @@ 'use strict'; -import { Worker, isMainThread, parentPort, workerData } from 'worker_threads'; -import os from 'os'; - -import publicGenerators from './generators/index.mjs'; -import astJs from './generators/ast-js/index.mjs'; -import oramaDb from './generators/orama-db/index.mjs'; - -const availableGenerators = { - ...publicGenerators, - // This one is a little special since we don't want it to run unless we need - // it and we also don't want it to be publicly accessible through the CLI. - 'ast-js': astJs, - 'orama-db': oramaDb, -}; - -// Thread pool max limit -const MAX_THREADS = Math.max(1, os.cpus().length - 1); - -// If inside a worker thread, perform the generator logic here -if (!isMainThread) { - const { name, dependencyOutput, extra } = workerData; - const generator = availableGenerators[name]; - - // Execute the generator and send the result back to the parent thread - generator - .generate(dependencyOutput, extra) - .then(result => { - parentPort.postMessage(result); - }) - .catch(error => { - parentPort.postMessage({ error }); - }); -} +import { allGenerators } from './generators/index.mjs'; +import { WorkerPool } from './threading.mjs'; /** * @typedef {{ ast: GeneratorMetadata}} AstGenerator The AST "generator" is a facade for the AST tree and it isn't really a generator @@ -65,74 +34,14 @@ const createGenerator = markdownInput => { */ const cachedGenerators = { ast: Promise.resolve(markdownInput) }; - // Keep track of how many threads are currently running - let activeThreads = 0; - const threadQueue = []; - - /** - * Run the input generator within a worker thread - * @param {keyof AllGenerators} name - * @param {any} dependencyOutput - * @param {Partial} extra - */ - const runInWorker = (name, dependencyOutput, extra) => { - return new Promise((resolve, reject) => { - /** - * Run the generator - */ - const run = () => { - activeThreads++; - - const worker = new Worker(new URL(import.meta.url), { - workerData: { name, dependencyOutput, extra }, - }); - - worker.on('message', result => { - activeThreads--; - processQueue(); - - if (result && result.error) { - reject(result.error); - } else { - resolve(result); - } - }); - - worker.on('error', err => { - activeThreads--; - processQueue(); - reject(err); - }); - }; - - if (activeThreads >= MAX_THREADS) { - threadQueue.push(run); - } else { - run(); - } - }); - }; - - /** - * Process the worker thread queue - */ - const processQueue = () => { - if (threadQueue.length > 0 && activeThreads < MAX_THREADS) { - const next = threadQueue.shift(); - next(); - } - }; + const threadPool = new WorkerPool(); /** * Runs the Generator engine with the provided top-level input and the given generator options * * @param {GeneratorOptions} options The options for the generator runtime */ - const runGenerators = async ({ - generators, - disableParallelism = false, - ...extra - }) => { + const runGenerators = async ({ generators, threads, ...extra }) => { // Note that this method is blocking, and will only execute one generator per-time // but it ensures all dependencies are resolved, and that multiple bottom-level generators // can reuse the already parsed content from the top-level/dependency generators @@ -141,14 +50,14 @@ const createGenerator = markdownInput => { dependsOn, generate, parallizable = true, - } = availableGenerators[generatorName]; + } = allGenerators[generatorName]; // If the generator dependency has not yet been resolved, we resolve // the dependency first before running the current generator if (dependsOn && !(dependsOn in cachedGenerators)) { await runGenerators({ ...extra, - disableParallelism, + threads, generators: [dependsOn], }); } @@ -159,9 +68,9 @@ const createGenerator = markdownInput => { // Adds the current generator execution Promise to the cache cachedGenerators[generatorName] = - disableParallelism || !parallizable + threads < 2 || !parallizable ? generate(dependencyOutput, extra) // Run in main thread - : runInWorker(generatorName, dependencyOutput, extra); // Offload to worker thread + : threadPool.run(generatorName, dependencyOutput, threads, extra); // Offload to worker thread } // Returns the value of the last generator of the current pipeline diff --git a/src/generators/index.mjs b/src/generators/index.mjs index 19ba9073..f4bb744a 100644 --- a/src/generators/index.mjs +++ b/src/generators/index.mjs @@ -9,8 +9,9 @@ import legacyJsonAll from './legacy-json-all/index.mjs'; import addonVerify from './addon-verify/index.mjs'; import apiLinks from './api-links/index.mjs'; import oramaDb from './orama-db/index.mjs'; +import astJs from './ast-js/index.mjs'; -export default { +export const publicGenerators = { 'json-simple': jsonSimple, 'legacy-html': legacyHtml, 'legacy-html-all': legacyHtmlAll, @@ -21,3 +22,10 @@ export default { 'api-links': apiLinks, 'orama-db': oramaDb, }; + +export const allGenerators = { + ...publicGenerators, + // This one is a little special since we don't want it to run unless we need + // it and we also don't want it to be publicly accessible through the CLI. + 'ast-js': astJs, +}; diff --git a/src/generators/types.d.ts b/src/generators/types.d.ts index a3eb07b6..91d98ebe 100644 --- a/src/generators/types.d.ts +++ b/src/generators/types.d.ts @@ -1,11 +1,11 @@ import type { SemVer } from 'semver'; import type { ApiDocReleaseEntry } from '../types'; -import type availableGenerators from './index.mjs'; +import type { publicGenerators } from './index.mjs'; declare global { // All available generators as an inferable type, to allow Generator interfaces // to be type complete and runtime friendly within `runGenerators` - export type AvailableGenerators = typeof availableGenerators; + export type AvailableGenerators = typeof publicGenerators; // This is the runtime config passed to the API doc generators export interface GeneratorOptions { @@ -36,6 +36,9 @@ declare global { // i.e. https://github.com/nodejs/node/tree/2cb1d07e0f6d9456438016bab7db4688ab354fd2 // i.e. https://gitlab.com/someone/node/tree/HEAD gitRef: string; + + // The number of threads the process is allowed to use + threads: number; } export interface GeneratorMetadata { diff --git a/src/threading.mjs b/src/threading.mjs new file mode 100644 index 00000000..fe975e34 --- /dev/null +++ b/src/threading.mjs @@ -0,0 +1,92 @@ +import { + Worker, + isMainThread, + parentPort, + workerData, +} from 'node:worker_threads'; +import { allGenerators } from './generators/index.mjs'; + +// If inside a worker thread, perform the generator logic here +if (!isMainThread) { + const { name, dependencyOutput, extra } = workerData; + const generator = allGenerators[name]; + + // Execute the generator and send the result or error back to the parent thread + generator + .generate(dependencyOutput, extra) + .then(result => parentPort.postMessage(result)) + .catch(error => parentPort.postMessage({ error })); +} + +/** + * WorkerPool class to manage a pool of worker threads + */ +export class WorkerPool { + /** @private {number} - Number of active threads */ + activeThreads = 0; + /** @private {Array} - Queue of pending tasks */ + queue = []; + + /** + * Runs a generator within a worker thread. + * @param {string} name - The name of the generator to execute + * @param {any} dependencyOutput - Input data for the generator + * @param {number} threads - Maximum number of threads to run concurrently + * @param {Object} extra - Additional options for the generator + * @returns {Promise} Resolves with the generator result, or rejects with an error + */ + run(name, dependencyOutput, threads, extra) { + return new Promise((resolve, reject) => { + /** + * Function to run the generator in a worker thread + */ + const run = () => { + this.activeThreads++; + + // Create and start the worker thread + const worker = new Worker(new URL(import.meta.url), { + workerData: { name, dependencyOutput, extra }, + }); + + // Handle worker thread messages (result or error) + worker.on('message', result => { + this.activeThreads--; + this.processQueue(threads); + + if (result?.error) { + reject(result.error); + } else { + resolve(result); + } + }); + + // Handle worker thread errors + worker.on('error', err => { + this.activeThreads--; + this.processQueue(threads); + reject(err); + }); + }; + + // If the active thread count exceeds the limit, add the task to the queue + if (this.activeThreads >= threads) { + this.queue.push(run); + } else { + run(); + } + }); + } + + /** + * Process the worker thread queue to start the next available task + * when there is room for more threads. + * @param {number} threads - Maximum number of threads to run concurrently + * @private + */ + processQueue(threads) { + if (this.queue.length > 0 && this.activeThreads < threads) { + const next = this.queue.shift(); + if (next) next(); + } + } +} From 939690583fde425e15ac4e6d84e9ff71a4324524 Mon Sep 17 00:00:00 2001 From: avivkeller Date: Thu, 10 Apr 2025 12:53:09 -0400 Subject: [PATCH 5/7] code review x1 --- src/generators.mjs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/generators.mjs b/src/generators.mjs index 0afdcf5f..aeba0059 100644 --- a/src/generators.mjs +++ b/src/generators.mjs @@ -54,7 +54,7 @@ const createGenerator = markdownInput => { // If the generator dependency has not yet been resolved, we resolve // the dependency first before running the current generator - if (dependsOn && !(dependsOn in cachedGenerators)) { + if (dependsOn && dependsOn in cachedGenerators === false) { await runGenerators({ ...extra, threads, From 2ad5e0186e472cb03fe6067659a70b51ed1f9287 Mon Sep 17 00:00:00 2001 From: avivkeller Date: Fri, 11 Apr 2025 16:27:27 -0400 Subject: [PATCH 6/7] code review --- src/generators.mjs | 10 ++---- src/{threading.mjs => threading/index.mjs} | 37 ++++++---------------- src/threading/worker.mjs | 11 +++++++ 3 files changed, 23 insertions(+), 35 deletions(-) rename src/{threading.mjs => threading/index.mjs} (69%) create mode 100644 src/threading/worker.mjs diff --git a/src/generators.mjs b/src/generators.mjs index aeba0059..bd058dbb 100644 --- a/src/generators.mjs +++ b/src/generators.mjs @@ -1,7 +1,7 @@ 'use strict'; import { allGenerators } from './generators/index.mjs'; -import { WorkerPool } from './threading.mjs'; +import WorkerPool from './threading/index.mjs'; /** * @typedef {{ ast: GeneratorMetadata}} AstGenerator The AST "generator" is a facade for the AST tree and it isn't really a generator @@ -46,11 +46,7 @@ const createGenerator = markdownInput => { // but it ensures all dependencies are resolved, and that multiple bottom-level generators // can reuse the already parsed content from the top-level/dependency generators for (const generatorName of generators) { - const { - dependsOn, - generate, - parallizable = true, - } = allGenerators[generatorName]; + const { dependsOn, generate } = allGenerators[generatorName]; // If the generator dependency has not yet been resolved, we resolve // the dependency first before running the current generator @@ -68,7 +64,7 @@ const createGenerator = markdownInput => { // Adds the current generator execution Promise to the cache cachedGenerators[generatorName] = - threads < 2 || !parallizable + threads < 2 ? generate(dependencyOutput, extra) // Run in main thread : threadPool.run(generatorName, dependencyOutput, threads, extra); // Offload to worker thread } diff --git a/src/threading.mjs b/src/threading/index.mjs similarity index 69% rename from src/threading.mjs rename to src/threading/index.mjs index fe975e34..732405f6 100644 --- a/src/threading.mjs +++ b/src/threading/index.mjs @@ -1,27 +1,9 @@ -import { - Worker, - isMainThread, - parentPort, - workerData, -} from 'node:worker_threads'; -import { allGenerators } from './generators/index.mjs'; - -// If inside a worker thread, perform the generator logic here -if (!isMainThread) { - const { name, dependencyOutput, extra } = workerData; - const generator = allGenerators[name]; - - // Execute the generator and send the result or error back to the parent thread - generator - .generate(dependencyOutput, extra) - .then(result => parentPort.postMessage(result)) - .catch(error => parentPort.postMessage({ error })); -} +import { Worker } from 'node:worker_threads'; /** * WorkerPool class to manage a pool of worker threads */ -export class WorkerPool { +export default class WorkerPool { /** @private {number} - Number of active threads */ activeThreads = 0; /** @private {Array} - Queue of pending tasks */ @@ -44,20 +26,19 @@ export class WorkerPool { this.activeThreads++; // Create and start the worker thread - const worker = new Worker(new URL(import.meta.url), { - workerData: { name, dependencyOutput, extra }, - }); + const worker = new Worker( + new URL(import.meta.resolve('./worker.mjs')), + { + workerData: { name, dependencyOutput, extra }, + } + ); // Handle worker thread messages (result or error) worker.on('message', result => { this.activeThreads--; this.processQueue(threads); - if (result?.error) { - reject(result.error); - } else { - resolve(result); - } + (result?.error ? reject : resolve)(result); }); // Handle worker thread errors diff --git a/src/threading/worker.mjs b/src/threading/worker.mjs new file mode 100644 index 00000000..73bbc29a --- /dev/null +++ b/src/threading/worker.mjs @@ -0,0 +1,11 @@ +import { parentPort, workerData } from 'node:worker_threads'; +import { allGenerators } from '../generators/index.mjs'; + +const { name, dependencyOutput, extra } = workerData; +const generator = allGenerators[name]; + +// Execute the generator and send the result or error back to the parent thread +generator + .generate(dependencyOutput, extra) + .then(result => parentPort.postMessage(result)) + .catch(error => parentPort.postMessage({ error })); From 17c2c33e3eadbc11c511ccda2bb93a420357d43c Mon Sep 17 00:00:00 2001 From: avivkeller Date: Fri, 11 Apr 2025 16:36:21 -0400 Subject: [PATCH 7/7] atomics --- src/threading/index.mjs | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/src/threading/index.mjs b/src/threading/index.mjs index 732405f6..648192ff 100644 --- a/src/threading/index.mjs +++ b/src/threading/index.mjs @@ -4,11 +4,29 @@ import { Worker } from 'node:worker_threads'; * WorkerPool class to manage a pool of worker threads */ export default class WorkerPool { - /** @private {number} - Number of active threads */ - activeThreads = 0; + /** @private {SharedArrayBuffer} - Shared memory for active thread count */ + sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT); + /** @private {Int32Array} - A typed array to access shared memory */ + activeThreads = new Int32Array(this.sharedBuffer); /** @private {Array} - Queue of pending tasks */ queue = []; + /** + * Gets the current active thread count. + * @returns {number} The current active thread count. + */ + getActiveThreadCount() { + return Atomics.load(this.activeThreads, 0); + } + + /** + * Changes the active thread count atomically by a given delta. + * @param {number} delta - The value to increment or decrement the active thread count by. + */ + changeActiveThreadCount(delta) { + Atomics.add(this.activeThreads, 0, delta); + } + /** * Runs a generator within a worker thread. * @param {string} name - The name of the generator to execute @@ -23,7 +41,7 @@ export default class WorkerPool { * Function to run the generator in a worker thread */ const run = () => { - this.activeThreads++; + this.changeActiveThreadCount(1); // Create and start the worker thread const worker = new Worker( @@ -35,7 +53,7 @@ export default class WorkerPool { // Handle worker thread messages (result or error) worker.on('message', result => { - this.activeThreads--; + this.changeActiveThreadCount(-1); this.processQueue(threads); (result?.error ? reject : resolve)(result); @@ -43,14 +61,14 @@ export default class WorkerPool { // Handle worker thread errors worker.on('error', err => { - this.activeThreads--; + this.changeActiveThreadCount(-1); this.processQueue(threads); reject(err); }); }; // If the active thread count exceeds the limit, add the task to the queue - if (this.activeThreads >= threads) { + if (this.getActiveThreadCount() >= threads) { this.queue.push(run); } else { run(); @@ -65,7 +83,7 @@ export default class WorkerPool { * @private */ processQueue(threads) { - if (this.queue.length > 0 && this.activeThreads < threads) { + if (this.queue.length > 0 && this.getActiveThreadCount() < threads) { const next = this.queue.shift(); if (next) next(); }