-
Notifications
You must be signed in to change notification settings - Fork 24
feat: chunked processing (streaming) #509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #509 +/- ##
==========================================
+ Coverage 75.71% 80.81% +5.09%
==========================================
Files 115 121 +6
Lines 11181 11984 +803
Branches 756 846 +90
==========================================
+ Hits 8466 9685 +1219
+ Misses 2711 2296 -415
+ Partials 4 3 -1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements chunked processing with streaming capabilities using async generators, enabling pipeline parallelism where downstream generators can begin processing results as soon as chunks complete from their dependencies, rather than waiting for all processing to finish.
Key Changes:
- Introduces streaming utilities for async chunk processing
- Adds
stream()method to parallel workers alongside existingmap()method - Converts all generator
generate()functions to async generators that yield chunks
Reviewed changes
Copilot reviewed 14 out of 15 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
src/streaming.mjs |
New utility module providing streaming helpers including yieldAsCompleted, collectAsyncGenerator, and caching mechanisms |
src/threading/parallel.mjs |
Adds stream() method for chunk-by-chunk yielding; refactors utility functions to shared module |
src/generators.mjs |
Updates dependency resolution to handle async generators; adds streaming cache for collecting generator results |
src/generators/metadata/index.mjs |
Converts to async generator pattern with streaming support |
src/generators/legacy-json/index.mjs |
Converts to async generator pattern with streaming support |
src/generators/legacy-json-all/index.mjs |
Adds processChunk() implementation; converts to async generator with chunk aggregation |
src/generators/legacy-html/index.mjs |
Converts to async generator; extracts replaceTemplateValues to separate module |
src/generators/legacy-html-all/index.mjs |
Adds processChunk() implementation; converts to async generator; uses extracted replaceTemplateValues |
src/generators/legacy-html/utils/replaceTemplateValues.mjs |
New utility module extracted from legacy-html generator for template value replacement |
src/generators/web/index.mjs |
Adds processChunk() pass-through; converts to async generator while maintaining batch processing |
src/generators/jsx-ast/index.mjs |
Converts to async generator; extracts getSortedHeadNodes to separate module |
src/generators/jsx-ast/utils/getSortedHeadNodes.mjs |
New utility module extracted from jsx-ast generator for sorting head nodes |
src/generators/ast-js/index.mjs |
Converts to async generator with streaming support |
src/generators/api-links/__tests__/fixtures.test.mjs |
Updates test to consume async generator results |
bin/commands/generate.mjs |
Increases default chunk size from 10 to 20 items per worker thread |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
I want to revise a few things before this PR is ready. |
… thread scheduler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 29 out of 33 changed files in this pull request and generated 10 comments.
Comments suppressed due to low confidence (1)
src/generators/api-links/tests/fixtures.test.mjs:48
- The test creates a WorkerPool but never terminates it. This can leave worker threads running after the test completes, potentially causing resource leaks or test hangs. Add
await pool.terminate();after line 47 (before the closing brace of the test).
const pool = new WorkerPool('../chunk-worker.mjs', cpus().length);
const worker = createParallelWorker('ast-js', pool, {
threads: 1,
chunkSize: 10,
});
// Collect results from the async generator
const astJsResults = [];
for await (const chunk of astJs.generate(undefined, {
input: [sourceFile],
worker,
})) {
astJsResults.push(...chunk);
}
const actualOutput = await apiLinks.generate(astJsResults, {
gitRef: 'https://github.com/nodejs/node/tree/HEAD',
});
for (const [k, v] of Object.entries(actualOutput)) {
actualOutput[k] = v.replace(/.*(?=lib\/)/, '');
}
t.assert.snapshot(actualOutput);
});
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
There seems a bug on the generators, investigating. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 31 out of 36 changed files in this pull request and generated no new comments.
Files not reviewed (1)
- npm-shrinkwrap.json: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
AugustinMauroy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With my knowledge.
LGTM !
and woot piscina it's awesome lib
It's funny that "piscina" means Pool in Portuguese. |
Wasn't it Italian ? |
Same |
avivkeller
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern is that a lot of the JSDoc contains types that are defined elsewhere
Can you share examples? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realize that my comments were poorly worded, and based on incorrect assumptions. I'm really sorry for the confusion. In the future, I'll attempt to better articulate what it is what I see, and to ensure that I have all the accurate information before leaving a review. Sorry again.
Any remaining comments can also be dealt with in a follow-up, and aren't implementation-wise, so feel free to merge when ready.
I appreciate that, not gonna lie, it's a bit frustrating quick comments that point an issue but at times not exactly what the desired outcome would be. I can't guess what you want. Anyhow, I tried to address them as much as possible! Can you re-review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 44 out of 55 changed files in this pull request and generated 1 comment.
Files not reviewed (1)
- npm-shrinkwrap.json: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Description
This PR updates our generator implementation by allowing streamed processing for generators that support
processChunkby using Async Iterators (Async Generator) (yield) which allows generators to eagerly start processing data as soon as at least one chunk from one of its dependencies finishes processing the data.flowchart TB subgraph Main["Main Thread"] G[Generator Pipeline] SC[Streaming Cache] end subgraph Pool["Piscina Worker Pool"] Q[Task Queue] W1[Worker 1] W2[Worker 2] W3[Worker N...] end G -->|"schedules"| PW[ParallelWorker] PW -->|"chunks items"| Q Q -->|"distributes"| W1 Q -->|"distributes"| W2 Q -->|"distributes"| W3 W1 -->|"yields results"| PW W2 -->|"yields results"| PW W3 -->|"yields results"| PW PW -->|"streams chunks"| G G -->|"caches collected"| SCPiscina Worker Pool
The
createWorkerPoolfunction (src/threading/index.mjs) creates a Piscina worker pool for parallel processing. Piscina is a battle-tested worker pool library maintained by Node.js core team members (Matteo Collina, James Snell, etc.) with ~6.5M weekly downloads. The pool is configured with fixed thread count, andidleTimeout: Infinityto keep workers alive for reuse across generators.Chunk Distribution & Streaming Results
The
createParallelWorkerfunction (src/threading/parallel.mjs) splits input items into chunks usingcreateChunks()based on the configuredchunkSize. Tasks are submitted to Piscina in parallel viapool.run(), and results are yielded as-completed usingPromise.race- each completed task is replaced with a never-resolving promise so the nextPromise.racepicks the next fastest result. This ensures results stream to dependent generators in completion order rather than submission order.sliceInput Optimization
Generators can opt into the
sliceInputoptimization by settingprocessChunk.sliceInput = true. When enabled, only the items at the specified indices are sent to workers instead of the full input array. This reduces serialization overhead for generators likemetadataandjsx-astthat don't need full context to process individual items.Streaming Cache & Dependency Resolution
The
createStreamingCache(src/streaming.mjs) ensures that when multiple generators depend on the same async generator source,collectAsyncGenerator()runs only once and all dependents share the cached result viagetOrCollect(). Generators implementprocessChunk(fullInput, itemIndices, options)for worker-side processing andasync *generate()for main-thread orchestration, yielding chunks as they complete to enable pipeline parallelism throughout the generator dependency chain.Logger Enhancements
The logger (
src/logger/logger.mjs) now tracks child loggers in achildrenSet, withsetLogLevel()propagating level changes to all descendants automatically. The console transport (src/logger/transports/console.mjs) displays metadata objects inline in magenta usingstyleText('magenta', JSON.stringify(metadata)), providing better visibility into structured log data during debugging.