From ba4a829612f143b548d6d5961432f693969b6f23 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Wed, 3 Dec 2025 14:58:32 +0200 Subject: [PATCH 1/2] use Queue for streams --- src/execution/IncrementalGraph.ts | 145 ++++--- src/execution/IncrementalPublisher.ts | 10 +- src/execution/Queue.ts | 354 +++++++++++++--- src/execution/__tests__/Queue-test.ts | 571 ++++++++++++++++++++++---- src/execution/execute.ts | 331 +++++---------- src/execution/types.ts | 8 +- 6 files changed, 993 insertions(+), 426 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index 971e08a271..786c956fa2 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -1,6 +1,7 @@ import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js'; import { invariant } from '../jsutils/invariant.js'; import { isPromise } from '../jsutils/isPromise.js'; +import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; import type { GraphQLError } from '../error/GraphQLError.js'; @@ -11,7 +12,6 @@ import type { IncrementalDataRecord, IncrementalDataRecordResult, PendingExecutionGroup, - StreamItemRecord, StreamRecord, SuccessfulExecutionGroup, } from './types.js'; @@ -20,20 +20,26 @@ import { isDeferredFragmentRecord, isPendingExecutionGroup } from './types.js'; /** * @internal */ -export class IncrementalGraph { +export class IncrementalGraph { private _rootNodes: Set; - private _completed: Queue; + private _completed: AsyncGenerator; // _push and _stop are assigned in the executor which is executed // synchronously by the Queue constructor. - private _push!: (item: IncrementalDataRecordResult) => void; + private _push!: (item: IncrementalDataRecordResult) => PromiseOrValue; private _stop!: () => void; - constructor() { + constructor( + reducer: ( + generator: Generator, + ) => U | undefined, + ) { this._rootNodes = new Set(); - this._completed = new Queue((push, stop) => { - this._push = push; - this._stop = stop; - }); + this._completed = new Queue( + ({ push, stop }) => { + this._push = push; + this._stop = stop; + }, + ).subscribe(reducer); } getNewRootNodes( @@ -145,10 +151,8 @@ export class IncrementalGraph { this._maybeStop(); } - subscribe( - mapFn: (generator: Generator) => U | undefined, - ): AsyncGenerator { - return this._completed.subscribe(mapFn); + subscribe(): AsyncGenerator { + return this._completed; } private _addIncrementalDataRecords( @@ -238,77 +242,68 @@ export class IncrementalGraph { const value = completedExecutionGroup.value; if (isPromise(value)) { // eslint-disable-next-line @typescript-eslint/no-floating-promises - value.then((resolved) => this._push(resolved)); + value.then((resolved) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this._push(resolved); + }); } else { + // eslint-disable-next-line @typescript-eslint/no-floating-promises this._push(value); } } private async _onStreamItems(streamRecord: StreamRecord): Promise { - let items: Array = []; - let errors: Array = []; - let newDeferredFragmentRecords: Array = []; - let incrementalDataRecords: Array = []; const streamItemQueue = streamRecord.streamItemQueue; - let streamItemRecord: StreamItemRecord | undefined; - while ((streamItemRecord = streamItemQueue.shift()) !== undefined) { - let result = - streamItemRecord instanceof BoxedPromiseOrValue - ? streamItemRecord.value - : streamItemRecord().value; - if (isPromise(result)) { - if (items.length > 0) { - this._push({ - streamRecord, - result: - // TODO add additional test case or rework for coverage - errors.length > 0 /* c8 ignore start */ - ? { items, errors } /* c8 ignore stop */ - : { items }, - newDeferredFragmentRecords, - incrementalDataRecords, - }); - items = []; - errors = []; - newDeferredFragmentRecords = []; - incrementalDataRecords = []; + let closed = false; + try { + await streamItemQueue.forEachBatch((streamItemResults) => { + const items: Array = []; + const errors: Array = []; + const newDeferredFragmentRecords: Array = []; + const incrementalDataRecords: Array = []; + + for (const result of streamItemResults) { + items.push(result.item); + if (result.errors !== undefined) { + errors.push(...result.errors); + } + if (result.newDeferredFragmentRecords !== undefined) { + newDeferredFragmentRecords.push( + ...result.newDeferredFragmentRecords, + ); + } + if (result.incrementalDataRecords !== undefined) { + incrementalDataRecords.push(...result.incrementalDataRecords); + } } - // eslint-disable-next-line no-await-in-loop - result = await result; - // wait an additional tick to coalesce resolving additional promises - // within the queue - // eslint-disable-next-line no-await-in-loop - await Promise.resolve(); - } - if (result.item === undefined) { - if (items.length > 0) { - this._push({ - streamRecord, - result: errors.length > 0 ? { items, errors } : { items }, - newDeferredFragmentRecords, - incrementalDataRecords, - }); + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this._push({ + streamRecord, + result: + // TODO add additional test case or rework for coverage + errors.length > 0 /* c8 ignore start */ + ? { items, errors } /* c8 ignore stop */ + : { items }, + newDeferredFragmentRecords, + incrementalDataRecords, + }); + + if (streamItemQueue.isStopped()) { + closed = true; + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this._push({ streamRecord }); } - this._push( - result.errors === undefined - ? { streamRecord } - : { - streamRecord, - errors: result.errors, - }, - ); - return; - } - items.push(result.item); - if (result.errors !== undefined) { - errors.push(...result.errors); - } - if (result.newDeferredFragmentRecords !== undefined) { - newDeferredFragmentRecords.push(...result.newDeferredFragmentRecords); - } - if (result.incrementalDataRecords !== undefined) { - incrementalDataRecords.push(...result.incrementalDataRecords); - } + }); + } catch (error) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this._push({ streamRecord, errors: [error] }); + return; + } + + if (!closed) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this._push({ streamRecord }); } } diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index ab07f9ac76..2fda046dff 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -62,7 +62,7 @@ class IncrementalPublisher { private _earlyReturns: Map Promise>; private _abortSignalListener: AbortSignalListener | undefined; private _nextId: number; - private _incrementalGraph: IncrementalGraph; + private _incrementalGraph: IncrementalGraph; constructor( earlyReturns: Map Promise>, @@ -71,7 +71,9 @@ class IncrementalPublisher { this._earlyReturns = earlyReturns; this._abortSignalListener = abortSignalListener; this._nextId = 0; - this._incrementalGraph = new IncrementalGraph(); + this._incrementalGraph = new IncrementalGraph((batch) => + this._handleCompletedBatch(batch), + ); } buildResponse( @@ -93,9 +95,7 @@ class IncrementalPublisher { ? { errors, data, pending, hasNext: true } : { data, pending, hasNext: true }; - const subsequentResults = this._incrementalGraph.subscribe((batch) => - this._handleCompletedBatch(batch), - ); + const subsequentResults = this._incrementalGraph.subscribe(); return { initialResult, diff --git a/src/execution/Queue.ts b/src/execution/Queue.ts index bd9a9a076d..ddab302b53 100644 --- a/src/execution/Queue.ts +++ b/src/execution/Queue.ts @@ -1,95 +1,345 @@ +import { invariant } from '../jsutils/invariant.js'; import { isPromise } from '../jsutils/isPromise.js'; import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; import { withCleanup } from './withCleanup.js'; +type Settled = + | { status: 'fulfilled'; value: T } + | { status: 'rejected'; reason: unknown }; + +interface ItemEntry { + kind: 'item'; + settled?: Settled; +} + +interface StopEntry { + kind: 'stop'; +} + +type Entry = ItemEntry | StopEntry; + +interface BatchRequest { + resolve: (generator: Generator | undefined) => void; + reject: (reason: unknown) => void; +} + +interface QueueExecutorOptions { + push: (item: PromiseOrValue) => PromiseOrValue; + stop: (reason?: unknown) => void; + started: Promise; + stopped: Promise; +} + /** + * A Queue is a lightweight async-generator primitive inspired by Brian Kim's + * Repeater (https://repeater.js.org, https://github.com/repeaterjs/repeater). + * The ergonomics are similar, but this implementation favors clarity over + * performance and gives producers flexibility to remain lazy, become eager, or + * live somewhere in between. + * + * The constructor takes an executor function and an optional `initialCapacity`. + * Executors receive `{ push, stop, started, stopped }` and may return `void` or + * a promise if they perform asynchronous setup. They call `push` whenever + * another item is ready, call `stop` when no more values will be produced + * (optionally supplying an error), await `started` when setup should run only + * after iteration begins, and await `stopped` to observe when the queue + * terminates. Because `push` and `stop` are plain functions, executors can + * hoist them into outside scopes or pass them to helpers. If the executor + * throws or its returned promise rejects, the queue treats it as `stop(error)` + * and propagates the failure. + * + * The `initialCapacity` argument (default `1`) governs backpressure. Capacity + * is the maximum number of buffered items allowed before a push must wait. + * When the backlog reaches capacity, `push` returns a promise that settles + * once consumption releases space; otherwise it returns `undefined`. Setting + * capacity to `1` yields a fully lazy queue (every push waits unless a prior + * item has been consumed); higher capacities buffer that many items eagerly. + * Capacity can be changed later via `setCapacity` and observed via + * `getCapacity`. + * + * `subscribe(reducer)` returns an async generator whose batches feed a generator + * of settled values into the reducer; whatever the reducer returns (other than + * `undefined`) becomes the yielded value for that batch. Calling `return()` on + * the subscription settles pending `next` calls thanks to `withCleanup`, + * providing direct abort semantics rather than leaving `next()` suspended. + * + * 'forEachBatch(reducer)` is a convenience method that subscribes with the + * given reducer and runs it for each batch until the queue stops. + * + * Producers can stay lazy by awaiting `started`, using zero capacity, and + * awaiting each `push`. Skipping those waits while raising capacity makes the + * queue eager up to its configured limit. The `isStopped()` helper exposes + * whether the queue has fully stopped, which can be useful when the reducer + * function actually performs external work and wants to bail early without + * awaiting another `next`. + * * @internal */ export class Queue { - private _items: Array; - private _stopped: boolean; - private _resolvers: Array<(iterable: Generator | undefined) => void>; + private _capacity: number; + private _backlog = 0; + private _waiters: Array<() => void> = []; + private _entries: Array> = []; + private _isStopped = false; + private _stopRequested = false; + private _batchRequests = new Set>(); + + private _resolveStarted: () => void; + private _resolveStopped: () => void; constructor( - executor: ( - push: (item: T) => void, - stop: () => void, - ) => PromiseOrValue, + executor: ({ + push, + stop, + started, + stopped, + }: QueueExecutorOptions) => PromiseOrValue, + initialCapacity = 1, ) { - this._items = []; - this._stopped = false; - this._resolvers = []; - let result; + this._capacity = this._normalizeCapacity(initialCapacity); + + const { promise: started, resolve: resolveStarted } = + // eslint-disable-next-line @typescript-eslint/no-invalid-void-type + promiseWithResolvers(); + + this._resolveStarted = resolveStarted; + const { promise: stopped, resolve: resolveStopped } = + // eslint-disable-next-line @typescript-eslint/no-invalid-void-type + promiseWithResolvers(); + this._resolveStopped = resolveStopped; + try { - result = executor(this._push.bind(this), this.stop.bind(this)); - } catch { - // ignore sync executor errors - } - if (isPromise(result)) { - result.catch(() => { - /* ignore async executor errors */ + const result = executor({ + push: this._push.bind(this), + stop: this._stop.bind(this), + started, + stopped, }); + if (isPromise(result)) { + result.catch((error: unknown) => this._stop(error)); + } + } catch (error) { + this._stop(error); } } - stop(): void { - this._stopped = true; - this._resolve(undefined); - } - subscribe( - mapFn: (generator: Generator) => U | undefined, + reducer: ( + generator: Generator, + ) => PromiseOrValue | undefined = (generator) => + Array.from(generator) as U, ): AsyncGenerator { - return withCleanup(this.subscribeImpl(mapFn), () => this.stop()); + const iterator = this._iteratorLoop(reducer); + return withCleanup(iterator, () => { + for (const entry of this._entries) { + if (entry.kind === 'item') { + this._release(); + } + } + this._entries.length = 0; + this._batchRequests.forEach((request) => request.resolve(undefined)); + this._batchRequests.clear(); + this._stop(); + }); + } + + async forEachBatch( + reducer: (generator: Generator) => PromiseOrValue, + ): Promise { + const sub = this.subscribe(async (generator) => { + const { promise: drained, resolve } = + // eslint-disable-next-line @typescript-eslint/no-invalid-void-type + promiseWithResolvers(); + + const wrappedBatch = (function* wrapper(): Generator { + yield* generator; + resolve(); + })(); + + await Promise.all([reducer(wrappedBatch), drained]); + }); + + for await (const _ of sub /* c8 ignore start */) { + // intentionally empty + } /* c8 ignore stop */ + } + + setCapacity(nextCapacity: number): void { + this._capacity = this._normalizeCapacity(nextCapacity); + this._flush(); + } + + getCapacity(): number { + return this._capacity; + } + + isStopped(): boolean { + return this._isStopped; } - private async *subscribeImpl( - mapFn: (generator: Generator) => U | undefined, + private _normalizeCapacity(capacity: number): number { + return Math.max(1, Math.floor(capacity)); + } + + private _flush(): void { + while (this._waiters.length > 0 && this._backlog < this._capacity) { + this._waiters.shift()?.(); + } + } + + private _reserve(): PromiseOrValue { + this._backlog += 1; + if (this._backlog < this._capacity) { + return undefined; + } + // eslint-disable-next-line @typescript-eslint/no-invalid-void-type + const { promise, resolve } = promiseWithResolvers(); + this._waiters.push(resolve); + return promise; + } + + private _release(): void { + if (this._backlog > 0) { + this._backlog -= 1; + } + this._flush(); + } + + private async *_iteratorLoop( + reducer: ( + generator: Generator, + ) => PromiseOrValue | undefined, ): AsyncGenerator { + this._resolveStarted(); let nextBatch: Generator | undefined; // eslint-disable-next-line no-await-in-loop - while ((nextBatch = await this._nextBatch()) !== undefined) { - const mapped = mapFn(nextBatch); - if (mapped !== undefined) { - yield mapped; + while ((nextBatch = await this._waitForNextBatch())) { + let reduced = reducer(nextBatch); + if (isPromise(reduced)) { + // eslint-disable-next-line no-await-in-loop + reduced = await reduced; } + if (reduced === undefined) { + continue; + } + yield reduced; } } - private _nextBatch(): Promise | undefined> { - if (this._items.length) { - return Promise.resolve(this.batch()); - } - if (this._stopped) { - return Promise.resolve(undefined); - } - const { promise, resolve } = promiseWithResolvers< + private _waitForNextBatch(): Promise | undefined> { + const { promise, resolve, reject } = promiseWithResolvers< Generator | undefined >(); - this._resolvers.push(resolve); + this._batchRequests.add({ resolve, reject }); + this._deliverBatchIfReady(); return promise; } - private _push(item: T): void { - if (!this._stopped) { - this._items.push(item); - this._resolve(this.batch()); + private _push(item: PromiseOrValue): PromiseOrValue { + if (this._stopRequested) { + return undefined; } + const maybePushPromise = this._reserve(); + if (isPromise(item)) { + const entry: ItemEntry = { kind: 'item' }; + this._entries.push(entry); + item.then( + (resolved) => { + entry.settled = { status: 'fulfilled', value: resolved }; + this._deliverBatchIfReady(); + }, + (reason: unknown) => { + entry.settled = { status: 'rejected', reason }; + this._deliverBatchIfReady(); + }, + ); + } else { + this._entries.push({ + kind: 'item', + settled: { status: 'fulfilled', value: item }, + }); + this._deliverBatchIfReady(); + } + return maybePushPromise; } - private _resolve(maybeIterable: Generator | undefined): void { - for (const resolve of this._resolvers) { - resolve(maybeIterable); + private _stop(reason?: unknown): void { + if (this._stopRequested) { + return; + } + this._stopRequested = true; + if (reason === undefined) { + if (this._entries.length === 0) { + this._isStopped = true; + this._resolveStopped(); + this._deliverBatchIfReady(); + return; + } + + this._entries.push({ kind: 'stop' }); + this._deliverBatchIfReady(); + return; } - this._resolvers = []; + + this._entries.push({ + kind: 'item', + settled: { status: 'rejected', reason }, + }); + this._entries.push({ kind: 'stop' }); + this._deliverBatchIfReady(); } - private *batch(): Generator { - let item: T | undefined; - while ((item = this._items.shift()) !== undefined) { - yield item; + private _deliverBatchIfReady(): void { + if (!this._batchRequests.size) { + return; + } + const headEntry = this._entries[0]; + const requests = this._batchRequests; + if (headEntry !== undefined) { + // stop sentinel always follows other work + invariant(headEntry.kind !== 'stop'); + + const settled = headEntry.settled; + if (settled !== undefined) { + if (settled.status === 'fulfilled') { + this._batchRequests = new Set(); + requests.forEach((request) => request.resolve(this._drainBatch())); + return; + } + this._entries.shift(); + this._release(); + this._isStopped = true; + this._resolveStopped(); + this._batchRequests = new Set(); + requests.forEach((request) => request.reject(settled.reason)); + } + } else if (this._isStopped) { + this._batchRequests = new Set(); + requests.forEach((request) => request.resolve(undefined)); + } + } + + private *_drainBatch(): Generator { + while (true) { + const entry = this._entries[0]; + if (entry === undefined) { + return; + } + if (entry.kind === 'stop') { + this._isStopped = true; + this._entries.shift(); + this._resolveStopped(); + return; + } + const settled = entry.settled; + if (settled === undefined || settled.status === 'rejected') { + return; + } + this._entries.shift(); + this._release(); + yield settled.value; } } } diff --git a/src/execution/__tests__/Queue-test.ts b/src/execution/__tests__/Queue-test.ts index 405deaa6b1..1706b3128d 100644 --- a/src/execution/__tests__/Queue-test.ts +++ b/src/execution/__tests__/Queue-test.ts @@ -1,36 +1,48 @@ +/* eslint-disable @typescript-eslint/no-floating-promises */ + import { expect } from 'chai'; import { describe, it } from 'mocha'; +import { expectPromise } from '../../__testUtils__/expectPromise.js'; import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; +import { invariant } from '../../jsutils/invariant.js'; +import { isPromise } from '../../jsutils/isPromise.js'; +import type { PromiseOrValue } from '../../jsutils/PromiseOrValue.js'; +import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js'; + import { Queue } from '../Queue.js'; describe('Queue', () => { - it('should yield sync pushed items in order', async () => { - const queue = new Queue((push) => { + it('should yield sync items pushed synchronously', async () => { + const sub = new Queue(({ push }) => { push(1); push(2); push(3); - }); + }).subscribe(); - const sub = queue.subscribe((batch) => Array.from(batch)); - expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); + expect(await sub.next()).to.deep.equal({ + done: false, + value: [1, 2, 3], + }); }); - it('should yield async pushed items in order', async () => { - const queue = new Queue(async (push) => { + it('should yield sync items pushed after initial delay', async () => { + const sub = new Queue(async ({ push }) => { await resolveOnNextTick(); push(1); push(2); push(3); - }); + }).subscribe(); - const sub = queue.subscribe((batch) => Array.from(batch)); - expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); + expect(await sub.next()).to.deep.equal({ + done: false, + value: [1, 2, 3], + }); }); - it('should yield sync and async pushed items in order', async () => { - const queue = new Queue(async (push) => { + it('should yield sync items pushed prior to and after delay', async () => { + const sub = new Queue(async ({ push }) => { push(1); push(2); push(3); @@ -38,17 +50,16 @@ describe('Queue', () => { push(4); push(5); push(6); - }); + }).subscribe(); - const sub = queue.subscribe((batch) => Array.from(batch)); expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3, 4, 5, 6], }); }); - it('should yield sync and async pushed items in order, separated by macro-task boundary', async () => { - const queue = new Queue(async (push) => { + it('should yield sync items pushed prior to and after macro-task boundary', async () => { + const sub = new Queue(async ({ push }) => { push(1); push(2); push(3); @@ -57,9 +68,8 @@ describe('Queue', () => { push(4); push(5); push(6); - }); + }).subscribe(); - const sub = queue.subscribe((batch) => Array.from(batch)); expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3], @@ -70,8 +80,8 @@ describe('Queue', () => { }); }); - it('should yield multiple async batches', async () => { - const queue = new Queue(async (push) => { + it('should yield multiple batches of sync items', async () => { + const sub = new Queue(async ({ push }) => { for (let i = 1; i <= 28; i += 3) { // eslint-disable-next-line no-await-in-loop await resolveOnNextTick(); @@ -79,101 +89,134 @@ describe('Queue', () => { push(i + 1); push(i + 2); } - }); + }).subscribe(); - const sub = queue.subscribe((batch) => Array.from(batch)[0]); - expect(await sub.next()).to.deep.equal({ done: false, value: 1 }); - expect(await sub.next()).to.deep.equal({ done: false, value: 4 }); - expect(await sub.next()).to.deep.equal({ done: false, value: 16 }); - expect(await sub.next()).to.deep.equal({ done: false, value: 28 }); + expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); + expect(await sub.next()).to.deep.equal({ + done: false, + value: [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], + }); + expect(await sub.next()).to.deep.equal({ + done: false, + value: [16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27], + }); + expect(await sub.next()).to.deep.equal({ + done: false, + value: [28, 29, 30], + }); }); it('should allow the executor to indicate completion', async () => { - const queue = new Queue((push, stop) => { + const sub = new Queue(({ push, stop }) => { push(1); stop(); - }); + }).subscribe(); - const sub = queue.subscribe((batch) => Array.from(batch)); expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); expect(await sub.next()).to.deep.equal({ done: true, value: undefined }); }); + it('returns stopped state synchronously when completed before push', () => { + let stop!: (reason?: unknown) => void; + const queue = new Queue(({ stop: savedStop }) => { + stop = savedStop; + }); + + expect(queue.isStopped()).to.equal(false); + stop(); + expect(queue.isStopped()).to.equal(true); + }); + + it('reports stopped after flushing remaining work', async () => { + const queue = new Queue(({ push, stop }) => { + push(1); + stop(); + }); + const sub = queue.subscribe(); + + expect(queue.isStopped()).to.equal(false); + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + expect(queue.isStopped()).to.equal(true); + }); + it('should allow the executor to indicate completion prior to any push calls', async () => { - const queue = new Queue((push, stop) => { + const sub = new Queue(({ push, stop }) => { stop(); push(1); // should be ignored - }); + }).subscribe(); - const sub = queue.subscribe((batch) => batch); expect(await sub.next()).to.deep.equal({ done: true, value: undefined }); }); + it('should resolve a pending next call when stopped before any pushes', async () => { + let stop!: (reason?: unknown) => void; + const sub = new Queue(({ stop: savedStop }) => { + stop = savedStop; + }).subscribe(); + + const nextPromise = sub.next(); + + stop(); + + expect(await nextPromise).to.deep.equal({ done: true, value: undefined }); + }); + it('should allow a consumer to abort a pending call to next', async () => { - const queue = new Queue(() => { + const sub = new Queue(() => { // no pushes - }); + }).subscribe(); - const sub = queue.subscribe((batch) => batch); const nextPromise = sub.next(); - queue.stop(); + await sub.return(); expect(await nextPromise).to.deep.equal({ done: true, value: undefined }); }); it('should allow saving the push function', async () => { - let push!: (item: number) => void; - const queue = new Queue((_push) => { - push = _push; - }); - - const sub = queue.subscribe((batch) => Array.from(batch)); + let push!: (item: number) => PromiseOrValue; + const sub = new Queue(({ push: savedPush }) => { + push = savedPush; + }).subscribe(); await resolveOnNextTick(); push(1); push(2); push(3); - expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); + expect(await sub.next()).to.deep.equal({ + done: false, + value: [1, 2, 3], + }); }); - it('should ignore sync error in the executor', async () => { - let push!: (item: number) => void; - const queue = new Queue((_push) => { - push = _push; + it('delivers queued items before rejecting on sync executor error', async () => { + const sub = new Queue(({ push }) => { + push(1); throw new Error('Oops'); - }); + }).subscribe(); - push(1); - - const sub = queue.subscribe((batch) => Array.from(batch)); expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + await expectPromise(sub.next()).toRejectWith('Oops'); }); - it('should ignore async error in the executor', async () => { - let push!: (item: number) => void; - const queue = new Queue(async (_push) => { - push = _push; + it('delivers queued items before rejecting on async executor error', async () => { + const sub = new Queue(async ({ push }) => { + push(1); await resolveOnNextTick(); throw new Error('Oops'); - }); - - await resolveOnNextTick(); - push(1); + }).subscribe(); - const sub = queue.subscribe((batch) => Array.from(batch)); expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + await expectPromise(sub.next()).toRejectWith('Oops'); }); - it('should skip payloads when mapped to undefined, skipping first async payload', async () => { - const queue = new Queue(async (push) => { + it('should skip payloads when reduced to undefined, skipping first async payload', async () => { + const sub = new Queue(async ({ push }) => { for (let i = 1; i <= 14; i += 1) { // eslint-disable-next-line no-await-in-loop await resolveOnNextTick(); push(i); } - }); - - const sub = queue.subscribe((batch) => { + }).subscribe((batch) => { const arr = Array.from(batch); if (arr[0] % 2 === 0) { return arr; @@ -184,19 +227,76 @@ describe('Queue', () => { expect(await sub.next()).to.deep.equal({ done: false, value: [14] }); }); - it('should condense pushes during map into the same batch', async () => { - let push!: (item: number) => void; - const queue = new Queue((_push) => { - push = _push; + it('accepts async reducer functions', async () => { + const sub = new Queue(({ push, stop }) => { + push(1); + push(2); + stop(); + }).subscribe(async (batch) => { + await resolveOnNextTick(); + return Array.from(batch); + }); + + expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2] }); + expect(await sub.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('forEachBatch resolves once the queue is drained', async () => { + const batches: Array> = []; + const queue = new Queue(async ({ push, stop }) => { + push(1); + await new Promise((resolve) => setTimeout(resolve)); + push(2); + stop(); + }); + + await queue.forEachBatch((batch) => { + batches.push(Array.from(batch)); + }); + + expect(batches).to.deep.equal([[1], [2]]); + }); + + it('allows async reducers to drain the batch later', async () => { + const queue = new Queue(({ push, stop }) => { + push(1); + push(2); + stop(); + }); + + const batches: Array> = []; + const finished = queue.forEachBatch((batch) => { + batches.push(batch); }); await resolveOnNextTick(); - push(1); - push(2); + const results = batches.flatMap((batch) => Array.from(batch)); + + await finished; + + expect(results).to.deep.equal([1, 2]); + }); + + it('forEachBatch rejects when the reducer throws', async () => { + const queue = new Queue(({ push }) => { + push(1); + }); + + await expectPromise( + queue.forEachBatch(() => { + throw new Error('Oops'); + }), + ).toRejectWith('Oops'); + }); + + it('should condense pushes reduced into the same batch', async () => { + let push!: (item: number) => PromiseOrValue; const itemsToAdd = [3, 4]; const items: Array = []; - const sub = queue.subscribe((batch) => { + const sub = new Queue(({ push: savedPush }) => { + push = savedPush; + }).subscribe((batch) => { for (const item of batch) { const itemToAdd = itemsToAdd.shift(); if (itemToAdd !== undefined) { @@ -206,9 +306,336 @@ describe('Queue', () => { } return items; }); + + await resolveOnNextTick(); + push(1); + push(2); + expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3, 4], }); }); + + it('exposes capacity controllers for fine-grained backpressure', async () => { + let push!: (item: number) => PromiseOrValue; + const queue = new Queue(({ push: savedPush }) => { + push = savedPush; + }, 1); + + const sub = queue.subscribe(); + + expect(queue.getCapacity()).to.equal(1); + queue.setCapacity(3); + + expect(push(1)).to.equal(undefined); + expect(push(2)).to.equal(undefined); + const push3 = push(3); + let resumed = false; + invariant(isPromise(push3)); + push3.then(() => { + resumed = true; + }); + + expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); + await Promise.resolve(push3); + expect(resumed).to.equal(true); + await sub.return(); + }); + + it('resolves pending pushes when capacity increases', async () => { + let push!: (item: number) => PromiseOrValue; + const queue = new Queue(({ push: savedPush }) => { + push = savedPush; + }, 1); + + const sub = queue.subscribe(); + + const push1 = push(1); + const push2 = push(2); + let resolved1 = false; + let resolved2 = false; + invariant(isPromise(push1)); + push1.then(() => { + resolved1 = true; + }); + invariant(isPromise(push2)); + push2.then(() => { + resolved2 = true; + }); + + await resolveOnNextTick(); + expect(resolved1).to.equal(false); + expect(resolved2).to.equal(false); + + expect(queue.getCapacity()).to.equal(1); + queue.setCapacity(3); + + await resolveOnNextTick(); + expect(resolved1).to.equal(true); + expect(resolved2).to.equal(true); + + await sub.return(); + }); + + it('wakes waiting next calls when batches finish', async () => { + let resolvePush!: (value: number) => void; + const sub = new Queue(({ push }) => { + push(1); + push( + new Promise((resolve) => { + resolvePush = resolve; + }), + ); + }).subscribe(); + + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + + const nextPromise = sub.next(); + resolvePush(2); + + expect(await nextPromise).to.deep.equal({ done: false, value: [2] }); + + const thirdPromise = sub.next(); + await sub.return(); + + expect(await thirdPromise).to.deep.equal({ done: true, value: undefined }); + }); + + it('should yield promised items in order once resolved', async () => { + const sub = new Queue(({ push }) => { + push(Promise.resolve(1)); + push(Promise.resolve(2)); + push(Promise.resolve(3)); + push(Promise.resolve(4)); + push(Promise.resolve(5)); + push(Promise.resolve(6)); + }).subscribe(); + + expect(await sub.next()).to.deep.equal({ + done: false, + value: [1, 2, 3, 4, 5, 6], + }); + }); + + it('should yield promised items in order even if stopped', async () => { + const sub = new Queue(({ push, stop }) => { + push(Promise.resolve(1)); + stop(); + }).subscribe(); + + expect(await sub.next()).to.deep.equal({ + done: false, + value: [1], + }); + expect(await sub.next()).to.deep.equal({ + done: true, + value: undefined, + }); + }); + + it('should pause batches behind pending promises', async () => { + let resolve2!: (value: number) => void; + const sub = new Queue(({ push }) => { + push(1); + const { promise, resolve } = promiseWithResolvers(); + resolve2 = () => resolve(2); + push(promise); + push(3); + }).subscribe(); + + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + + resolve2(2); + + expect(await sub.next()).to.deep.equal({ done: false, value: [2, 3] }); + }); + + it('should pause batches behind pending promises', async () => { + let resolve2!: (value: number) => void; + let resolve5!: (value: number) => void; + const sub = new Queue(({ push }) => { + push(1); + const { promise: promise2, resolve: _resolve2 } = + promiseWithResolvers(); + resolve2 = _resolve2; + push(promise2); + push(3); + push(4); + const { promise: promise5, resolve: _resolve5 } = + promiseWithResolvers(); + resolve5 = _resolve5; + push(promise5); + push(6); + push(7); + }).subscribe(); + + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + + resolve2(2); + + expect(await sub.next()).to.deep.equal({ done: false, value: [2, 3, 4] }); + + resolve5(5); + + expect(await sub.next()).to.deep.equal({ done: false, value: [5, 6, 7] }); + }); + + it('should abort on errored promise with a pending next', async () => { + let reject!: (reason: unknown) => void; + const sub = new Queue(({ push }) => { + push(1); + const { promise, reject: _reject } = promiseWithResolvers(); + reject = _reject; + push(promise); + push(3); + }).subscribe(); + + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + + const nextPromise = sub.next(); + + reject(new Error('Oops')); + + await expectPromise(nextPromise).toRejectWith('Oops'); + }); + + it('should abort on errored promise without pending next', async () => { + let reject!: (reason: unknown) => void; + const sub = new Queue(({ push }) => { + push(1); + const { promise, reject: _reject } = promiseWithResolvers(); + reject = _reject; + push(promise); + push(3); + }).subscribe(); + + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + + reject(new Error('Oops')); + + await resolveOnNextTick(); + await resolveOnNextTick(); + + await expectPromise(sub.next()).toRejectWith('Oops'); + }); + + it('should abort on errored promise after resuming from normal promise', async () => { + const sub = new Queue(({ push }) => { + push(Promise.resolve(1)); + push(Promise.reject(new Error('Oops'))); + push(3); + }).subscribe(); + + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + + await expectPromise(sub.next()).toRejectWith('Oops'); + }); + + it('should resolve push promise when an item is consumed', async () => { + let pushed = false; + const sub = new Queue(({ push }) => { + const push1 = push(1); + invariant(isPromise(push1)); + push1.then(() => { + pushed = true; + }); + }).subscribe(); + + expect(pushed).to.equal(false); + + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + expect(pushed).to.equal(true); + }); + + it('should resolve push promise when stopped before consumption', async () => { + let pushed1 = false; + let pushed2 = false; + const sub = new Queue(({ push }) => { + const push1 = push(1); + invariant(isPromise(push1)); + push1.then(() => { + pushed1 = true; + }); + const push2 = push( + new Promise(() => { + // never resolve + }), + ); + invariant(isPromise(push2)); + push2.then(() => { + pushed2 = true; + }); + }).subscribe(); + + expect(pushed1).to.equal(false); + expect(pushed2).to.equal(false); + + await sub.return(); + + await resolveOnNextTick(); + + expect(pushed1).to.equal(true); + expect(pushed2).to.equal(true); + }); + + it('should resolve started promise when iteration begins', async () => { + let startedPromise!: Promise; + let started = false; + const sub = new Queue(({ started: _startedPromise }) => { + startedPromise = _startedPromise; + + startedPromise.then(() => { + started = true; + }); + }).subscribe(); + + expect(started).to.equal(false); + + sub.next(); + + await resolveOnNextTick(); + + expect(started).to.equal(true); + }); + + it('should resolve stopped promise when iteration ends', async () => { + let stoppedPromise!: Promise; + let stopped = false; + new Queue(({ stop, stopped: _stoppedPromise }) => { + stoppedPromise = _stoppedPromise; + + stoppedPromise.then(() => { + stopped = true; + }); + stop(); + }).subscribe(); + + expect(stopped).to.equal(false); + + await resolveOnNextTick(); + + expect(stopped).to.equal(true); + }); + + it('stops in an error state when calling stopped with a reason, i.e. the last call to next to reject with that reason', async () => { + let stoppedPromise!: Promise; + let stopped = false; + const sub = new Queue(({ push, stop, stopped: _stoppedPromise }) => { + stoppedPromise = _stoppedPromise; + + stoppedPromise.then(() => { + stopped = true; + }); + push(1); + stop(new Error('Oops')); + }).subscribe(); + + expect(stopped).to.equal(false); + + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + await expectPromise(sub.next()).toRejectWith('Oops'); + + expect(stopped).to.equal(true); + }); }); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 66a353de10..cab7dc4a36 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -71,13 +71,13 @@ import { import { getVariableSignature } from './getVariableSignature.js'; import { buildIncrementalResponse } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; +import { Queue } from './Queue.js'; import type { CompletedExecutionGroup, ExecutionResult, ExperimentalIncrementalExecutionResults, IncrementalDataRecord, PendingExecutionGroup, - StreamItemRecord, StreamItemResult, StreamRecord, } from './types.js'; @@ -162,6 +162,7 @@ export interface ValidatedExecutionArgs { validatedExecutionArgs: ValidatedExecutionArgs, ) => PromiseOrValue; enableEarlyExecution: boolean; + streamQueueCapacity: number; hideSuggestions: boolean; abortSignal: AbortSignal | undefined; } @@ -203,6 +204,8 @@ export interface ExecutionArgs { options?: { /** Set the maximum number of errors allowed for coercing (defaults to 50). */ maxCoercionErrors?: number; + /** Configure the capacity used for stream item queues (defaults to 100). */ + streamQueueCapacity?: number; }; } @@ -562,6 +565,7 @@ export function validateExecutionArgs( subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, perEventExecutor: perEventExecutor ?? executeSubscriptionEvent, enableEarlyExecution: enableEarlyExecution === true, + streamQueueCapacity: options?.streamQueueCapacity ?? 100, hideSuggestions, abortSignal: args.abortSignal ?? undefined, }; @@ -1354,7 +1358,7 @@ async function completeAsyncIteratorValue( try { while (true) { if (streamUsage && index >= streamUsage.initialCount) { - const streamItemQueue = buildAsyncStreamItemQueue( + const streamItemQueue = buildStreamItemQueue( index, path, asyncIterator, @@ -1537,16 +1541,12 @@ function completeIterableValue( path, ); const iterator = items[Symbol.iterator](); - let iteration = iterator.next(); - while (!iteration.done) { - const item = iteration.value; - + while (true) { if (streamUsage && index >= streamUsage.initialCount) { const syncStreamRecord: StreamRecord = { label: streamUsage.label, path, - streamItemQueue: buildSyncStreamItemQueue( - item, + streamItemQueue: buildStreamItemQueue( index, path, iterator, @@ -1561,6 +1561,13 @@ function completeIterableValue( break; } + const iteration = iterator.next(); + if (iteration.done) { + break; + } + + const item = iteration.value; + // No need to modify the info object containing the path, // since from here on it is not ever accessed by resolver functions. const itemPath = addPath(path, index, undefined); @@ -1597,8 +1604,6 @@ function completeIterableValue( containsPromise = true; } index++; - - iteration = iterator.next(); } return containsPromise @@ -2624,176 +2629,68 @@ function getDeferredFragmentRecords( ); } -function buildSyncStreamItemQueue( - initialItem: PromiseOrValue, +function buildStreamItemQueue( initialIndex: number, streamPath: Path, - iterator: Iterator, + iterator: Iterator | AsyncIterator, exeContext: ExecutionContext, fieldDetailsList: FieldDetailsList, info: GraphQLResolveInfo, itemType: GraphQLOutputType, -): Array { - const streamItemQueue: Array = []; - - const enableEarlyExecution = - exeContext.validatedExecutionArgs.enableEarlyExecution; - - const firstExecutor = () => { - const initialPath = addPath(streamPath, initialIndex, undefined); - const firstStreamItem = new BoxedPromiseOrValue( - completeStreamItem( - initialPath, - initialItem, +): Queue { + const { enableEarlyExecution, streamQueueCapacity } = + exeContext.validatedExecutionArgs; + return new Queue(async ({ push, stop, started }) => { + if (!enableEarlyExecution) { + await started; + } + let index = initialIndex; + while (true) { + let iteration; + try { + iteration = iterator.next(); + if (isPromise(iteration)) { + // eslint-disable-next-line no-await-in-loop + iteration = await iteration; + } + } catch (rawError) { + throw locatedError( + rawError, + toNodes(fieldDetailsList), + pathToArray(streamPath), + ); + } + + if (iteration.done) { + stop(); + return; + } + + const itemPath = addPath(streamPath, index, undefined); + + let streamItemResult = completeStreamItem( + itemPath, + iteration.value, exeContext, { errors: [], completed: false }, fieldDetailsList, info, itemType, - ), - ); - - let iteration = iterator.next(); - let currentIndex = initialIndex + 1; - let currentStreamItem: - | BoxedPromiseOrValue - | (() => BoxedPromiseOrValue) = firstStreamItem; - while (!iteration.done) { - // TODO: add test case for early sync termination - /* c8 ignore next 6 */ - if (currentStreamItem instanceof BoxedPromiseOrValue) { - const result = currentStreamItem.value; - if (!isPromise(result) && result.item === undefined) { - break; - } + ); + if (isPromise(streamItemResult) && !enableEarlyExecution) { + // eslint-disable-next-line no-await-in-loop + streamItemResult = await streamItemResult; } - - const itemPath = addPath(streamPath, currentIndex, undefined); - - const value = iteration.value; - - const currentExecutor = () => - completeStreamItem( - itemPath, - value, - exeContext, - { errors: [], completed: false }, - fieldDetailsList, - info, - itemType, - ); - - currentStreamItem = enableEarlyExecution - ? new BoxedPromiseOrValue(currentExecutor()) - : () => new BoxedPromiseOrValue(currentExecutor()); - - streamItemQueue.push(currentStreamItem); - - iteration = iterator.next(); - currentIndex = initialIndex + 1; + const pushResult = push(streamItemResult); + // TODO: add back-pressure test + /* c8 ignore next 4 */ + if (isPromise(pushResult)) { + // eslint-disable-next-line no-await-in-loop + await pushResult; + } + index += 1; } - - streamItemQueue.push(new BoxedPromiseOrValue({})); - - return firstStreamItem.value; - }; - - streamItemQueue.push( - enableEarlyExecution - ? new BoxedPromiseOrValue(Promise.resolve().then(firstExecutor)) - : () => new BoxedPromiseOrValue(firstExecutor()), - ); - - return streamItemQueue; -} - -function buildAsyncStreamItemQueue( - initialIndex: number, - streamPath: Path, - asyncIterator: AsyncIterator, - exeContext: ExecutionContext, - fieldDetailsList: FieldDetailsList, - info: GraphQLResolveInfo, - itemType: GraphQLOutputType, -): Array { - const streamItemQueue: Array = []; - const executor = () => - getNextAsyncStreamItemResult( - streamItemQueue, - streamPath, - initialIndex, - asyncIterator, - exeContext, - fieldDetailsList, - info, - itemType, - ); - - streamItemQueue.push( - exeContext.validatedExecutionArgs.enableEarlyExecution - ? new BoxedPromiseOrValue(executor()) - : () => new BoxedPromiseOrValue(executor()), - ); - - return streamItemQueue; -} - -async function getNextAsyncStreamItemResult( - streamItemQueue: Array, - streamPath: Path, - index: number, - asyncIterator: AsyncIterator, - exeContext: ExecutionContext, - fieldDetailsList: FieldDetailsList, - info: GraphQLResolveInfo, - itemType: GraphQLOutputType, -): Promise { - let iteration; - try { - iteration = await asyncIterator.next(); - } catch (error) { - return { - errors: [ - locatedError(error, toNodes(fieldDetailsList), pathToArray(streamPath)), - ], - }; - } - - if (iteration.done) { - return {}; - } - - const itemPath = addPath(streamPath, index, undefined); - - const result = completeStreamItem( - itemPath, - iteration.value, - exeContext, - { errors: [], completed: false }, - fieldDetailsList, - info, - itemType, - ); - - const executor = () => - getNextAsyncStreamItemResult( - streamItemQueue, - streamPath, - index + 1, - asyncIterator, - exeContext, - fieldDetailsList, - info, - itemType, - ); - - streamItemQueue.push( - exeContext.validatedExecutionArgs.enableEarlyExecution - ? new BoxedPromiseOrValue(executor()) - : () => new BoxedPromiseOrValue(executor()), - ); - - return result; + }, streamQueueCapacity); } function completeStreamItem( @@ -2824,53 +2721,62 @@ function completeStreamItem( incrementalContext.completed = true; return buildStreamItemResult(incrementalContext.errors, resolvedItem); }, - (error: unknown) => { + (rawError: unknown) => { incrementalContext.completed = true; - return { - errors: [...incrementalContext.errors, error as GraphQLError], - }; + handleFieldError( + rawError, + exeContext, + itemType, + fieldDetailsList, + itemPath, + incrementalContext, + ); + return buildStreamItemResult(incrementalContext.errors, { + rawResult: null, + newDeferredFragmentRecords: undefined, + incrementalDataRecords: undefined, + }); }, ); } let result: PromiseOrValue>; try { - try { - result = completeValue( - exeContext, - itemType, - fieldDetailsList, - info, - itemPath, - item, - incrementalContext, - new Map(), - ); - } catch (rawError) { - handleFieldError( - rawError, - exeContext, - itemType, - fieldDetailsList, - itemPath, - incrementalContext, - ); - result = { - rawResult: null, - newDeferredFragmentRecords: undefined, - incrementalDataRecords: undefined, - }; - } - } catch (error) { + result = completeValue( + exeContext, + itemType, + fieldDetailsList, + info, + itemPath, + item, + incrementalContext, + new Map(), + ); + } catch (rawError) { incrementalContext.completed = true; - return { - errors: [...incrementalContext.errors, error], - }; + handleFieldError( + rawError, + exeContext, + itemType, + fieldDetailsList, + itemPath, + incrementalContext, + ); + return buildStreamItemResult(incrementalContext.errors, { + rawResult: null, + newDeferredFragmentRecords: undefined, + incrementalDataRecords: undefined, + }); } if (isPromise(result)) { - return result - .then(undefined, (rawError: unknown) => { + return result.then( + (resolved) => { + incrementalContext.completed = true; + return buildStreamItemResult(incrementalContext.errors, resolved); + }, + (rawError: unknown) => { + incrementalContext.completed = true; handleFieldError( rawError, exeContext, @@ -2879,24 +2785,13 @@ function completeStreamItem( itemPath, incrementalContext, ); - return { + return buildStreamItemResult(incrementalContext.errors, { rawResult: null, newDeferredFragmentRecords: undefined, incrementalDataRecords: undefined, - }; - }) - .then( - (resolvedItem) => { - incrementalContext.completed = true; - return buildStreamItemResult(incrementalContext.errors, resolvedItem); - }, - (error: unknown) => { - incrementalContext.completed = true; - return { - errors: [...incrementalContext.errors, error as GraphQLError], - }; - }, - ); + }); + }, + ); } incrementalContext.completed = true; diff --git a/src/execution/types.ts b/src/execution/types.ts index 638f1d3725..2d6399fbb8 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -7,6 +7,8 @@ import type { GraphQLFormattedError, } from '../error/GraphQLError.js'; +import type { Queue } from './Queue.js'; + /** * The result of GraphQL execution. * @@ -246,7 +248,7 @@ export function isDeferredFragmentRecord( } export interface StreamItemResult { - item?: unknown; + item: unknown; newDeferredFragmentRecords?: | ReadonlyArray | undefined; @@ -254,13 +256,11 @@ export interface StreamItemResult { errors?: ReadonlyArray; } -export type StreamItemRecord = ThunkIncrementalResult; - export interface StreamRecord { path: Path; label: string | undefined; id?: string | undefined; - streamItemQueue: Array; + streamItemQueue: Queue; } export interface StreamItemsResult { From 5b39853531b65d50793045cf0bf923eb19f94c4a Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Wed, 3 Dec 2025 14:59:29 +0200 Subject: [PATCH 2/2] introduce WorkQueue --- src/execution/CreateWorkQueue.md | 206 +++ src/execution/IncrementalGraph.ts | 315 ---- src/execution/IncrementalPublisher.ts | 327 +++-- src/execution/WorkQueue.ts | 664 +++++++++ src/execution/__tests__/WorkQueue-test.ts | 1262 +++++++++++++++++ src/execution/__tests__/defer-test.ts | 74 + src/execution/__tests__/stream-test.ts | 298 ++-- src/execution/execute.ts | 178 +-- src/execution/types.ts | 151 +- src/jsutils/BoxedPromiseOrValue.ts | 26 - .../__tests__/BoxedPromiseOrValue-test.ts | 30 - 11 files changed, 2722 insertions(+), 809 deletions(-) create mode 100644 src/execution/CreateWorkQueue.md delete mode 100644 src/execution/IncrementalGraph.ts create mode 100644 src/execution/WorkQueue.ts create mode 100644 src/execution/__tests__/WorkQueue-test.ts delete mode 100644 src/jsutils/BoxedPromiseOrValue.ts delete mode 100644 src/jsutils/__tests__/BoxedPromiseOrValue-test.ts diff --git a/src/execution/CreateWorkQueue.md b/src/execution/CreateWorkQueue.md new file mode 100644 index 0000000000..ff5de7cee0 --- /dev/null +++ b/src/execution/CreateWorkQueue.md @@ -0,0 +1,206 @@ +## Creating the Work Queue + +### Overview + +{CreateWorkQueue} returns the initial work and event stream that reports work progress, where the internal state of the queue is represented as an acyclic directed graph with its own internal queue of task completion and stream events. The graph evolves as work proceeds, modeling relationships among groups, tasks, and streams; new nodes may be discovered on internal {TASK_SUCCESS} or {STREAM_ITEMS} events, each which provide a value and optional new work (e.g. new groups, tasks, or streams). + +The internal events include: + +- {TASK_SUCCESS} reports that a {task} completed successfully with a {result} (containing a {value} and optional {work}). +- {TASK_FAILURE} carries the failed {task} and the causal {error}. +- {STREAM_ITEMS} reports that a {stream} produced one or more {items} (each containing a {value} and optional {work}). +- {STREAM_SUCCESS} announces that a {stream} completed successfully. +- {STREAM_FAILURE} includes both the {stream} and the terminal {error}. + +Consumers of the work queue do not see these internal events directly; instead, they receive external events emitted on the returned observable {workEventStream}. These external events summarize progress at the granularity of groups and streams, abstracting away individual task completions. Each event also enumerates any newly visible work segments (groups and streams that became roots) so consumers can react without re-querying the queue's internal state. + +The external events include {STREAM_SUCCESS} and {STREAM_FAILURE}, which correspond directly to their internal counterparts, as well as: + +- {GROUP_VALUES} delivers the task {values} for a completed root {group} (tasks already surfaced via other groups are omitted). +- {GROUP_SUCCESS} delivers the completed {group}, a list of its task {values} (omitting those task {values} previously delivered within other groups), and any {newGroups}/{newStreams} that became roots. +- {GROUP_FAILURE} carries the failed {group} and the propagated {error}. +- {STREAM_VALUES} reports the producing {stream}, the yielded batch of item {values}, and any {newGroups}/{newStreams} introduced once the batch's {work} is integrated. +- {WORK_QUEUE_TERMINATION} signals that there are no more root groups or streams. + +The work event stream concludes when the graph is empty. + +### Graph Definition + +- Let the graph be {G} = ({V}, {Egc}, {Egt}, {Ets}, {Etv}), where: + - {V} = {Vgroup} ∪ {Vtask} ∪ {Vstream} ∪ {Vvalue}. + ד - {Egc} are {group} → {childGroup} edges, capturing that a {childGroup} can begin only after its parent group succeeds. + - {Egt} are {group} → {task} edges. + - {Ets} are {task} → {stream} edges, capturing that a {stream} can begin as soon as its producing {task} yields an item. + - {Etv} are {task} → {value} edges, each pointing to a node that stores the completion value of its source task. +- A root {group} has no incoming edge in {Egc}. +- A root {stream} has no incoming edge in {Ets}. +- Each {task} may be targeted by more than one {group} via {Egt}. A {task} always includes the list of {groups} that target it, as well as a function to execute. A {task} executes once and either fails with an {error} or succeeds with a {value} and optional {work}. The {value} component is stored in a {value} node targeted by {task} via {Etv}. Any {work} is immediately integrated (by {IntegrateWork}). +- A {group} completes when every {task} it targets has completed successfully. A {group} fails if any {task} it targets fails. +- A {stream} runs once started, producing zero or more items and then terminating either successfully or with an {error}. Each produced item yields a {value} and optional {work}; the {values} for each batch are reported via the internal {STREAM_ITEMS} event (which drives the external {STREAM_VALUES} event surfaced to consumers) and the {work} is immediately integrated into the graph. + +### Creating the Work Queue + +This algorithm constructs the internal {graphEventQueue}, initializes the graph with the supplied {groups}, {tasks}, and {streams}, and returns the observable {workEventStream}. + +CreateWorkQueue(work): + +- Let {graphEventQueue} be a new queue (capacity 1). +- Let {graph} be initially empty. +- Let {maybeEmptyNewGroups} and {newStreams} be the result of {IntegrateWork(work)}. +- Let {newGroups} be the result of {PruneEmptyGroups(maybeEmptyNewGroups)}. +- Call {StartWork(newGroups, newStreams)}. +- Let {workEventStream} be the following event stream (each iteration yields the batch of external events produced while handling one batch of graph events): + - For each event {e} produced by {graphEventQueue}: + - If {e} is {TASK_SUCCESS(task, result)}, call {TaskSuccess(task, result)}. + - If {e} is {TASK_FAILURE(task, error)}, call {TaskFailure(task, error)}. + - If {e} is {STREAM_ITEMS(stream, items)}, call {StreamItems(stream, items)}. + - If {e} is {STREAM_SUCCESS(stream)}, call {StreamSuccess(stream)}. + - If {e} is {STREAM_FAILURE(stream, error)}, call {StreamFailure(stream, error)}. + - If {graph} has no root groups or root streams, close {workEventStream} and yield {WORK_QUEUE_TERMINATION}. + +- Return {newGroups}, {newStreams}, and {workEventStream}. + +The following algorithms have access to {graphEventQueue}, {graph}, and {workEventStream}. + +#### Integrating and Starting Work + +IntegrateWork(work, parentTask): + +- Let {groups}, {tasks}, and {streams} be the respective sets drawn from {work} (missing fields denote empty sets). +- Initialize {visitedGroups} to the empty set and {rootGroups} to the empty list. +- For each group {g} in {groups}: + - Let {maybeRoot} be the result of {AddGroup(g, groups, parentTask, visitedGroups)} to {rootGroups}. + - If {maybeRoot} is defined, append it to {rootGroups}. +- For each task {t} in {tasks}: + - Insert {t} into {graph}. + - Let {groups} be the list of groups that target {t}. + - For each group {g} in {groups}: + - Record {g → t}. + - If {g} is a root and {t} is not-yet-started, call {StartTask(t)}. +- For each stream {s} in {streams}: + - Insert {s} into {graph}. + - If {parentTask} is defined, record {parentTask → s}; otherwise {s} is a root. +- Return the newly inserted root {groups} (namely {rootGroups}) and root {streams}. + +AddGroup(group, groupSet, parentTask, visitedGroups): + +- If {group} is in {visitedGroups}, return. +- Add {group} to {visitedGroups}. +- If {parentTask} is defined, assert that {group} must specify a {parent} group; initial work and stream items are the only sources of root groups. +- If {group} does not specify a {parent} group: + - Insert {group} into {graph}. + - Return {group}. +- If {parent} is in {groupSet}: + - Let {ancestor} be the result of {AddGroup(parent, groupSet, parentTask, visitedGroups)}. + - Insert {group} into {graph} and record {parent → group}. + - Return {ancestor}. +- Otherwise, if {parent} is in {graph}: + - Record {parent → group}. + +PruneEmptyGroups(originalGroups): + +- Initialize {nonEmptyGroups} to the empty list. +- For each {group} in {originalGroups}: + - If {group} targets at least one {task}, append {group} to {nonEmptyGroups} and continue to the next {group} in {originalGroups}. + - Let {maybeEmptyNewGroups} be the set of child groups targeted by {group}. + - Append the results of {PruneEmptyGroups(maybeEmptyNewGroups)} to {nonEmptyGroups}. + - Remove {group} from {graph}. +- Return {nonEmptyGroups}. + +StartWork(newGroups, newStreams): + +- For each {group} in {newGroups}, call {StartGroup(group)}. +- For each {stream} in {newStreams}, call {StartStream(stream)}. + +StartGroup(group): + +- For each {task} targeted by {group}: + - Call {StartTask(task)}. + +StartTask(task): + +- Start executing {task} such that: + - If and when {task} completes successfully with {result}, enqueue {TaskSuccess(task, result)} onto {graphEventQueue}. + - If and when {task} fails with {error}, enqueue {TaskFailure(task, error)} onto {graphEventQueue}. + +StartStream(stream): + +- Drain {stream} such that: + - On each successfully produced batch of {items}, enqueue {StreamItems(stream, items)} onto {graphEventQueue}. + - If and when {stream} terminates successfully, enqueue {StreamSuccess(stream)} onto {graphEventQueue}. + - If and when {stream} terminates with {error}, enqueue {StreamFailure(stream, error)} onto {graphEventQueue}. + +### Handling Task Success + +{TaskSuccess(task, result)} reconciles a completed task. It first inspects the parent groups: any root group whose tasks are all complete emits {GROUP_SUCCESS}, is removed from the graph, and takes its completed tasks (and their value nodes) with it. Because each task's work is integrated at completion time, removing the group now merely frees newly exposed child groups and downstream streams to become root candidates whose work can be enqueued. Each {GROUP_SUCCESS} event includes the resolved {group}, the {values} map of task results, and any {newGroups}/{newStreams} that surfaced because the {group} left the graph. + +TaskSuccess(task, result): + +- If {task} is no longer in {graph}, return early. +- Let {value} and {work} be the respective fields of {result}. +- Mark {task} complete by inserting a {value} node {v} into {Vvalue} that stores {value} and recording the edge {task → v} in {Etv}. +- If {work} exists, call {IntegrateWork(work, task)}. +- For each group {g} with {g → task} (prior to any removal of {task} while iterating): + - If all {tasks} targeted by {g} are complete and {g} is a root: + - Call {GroupSuccess(g)}. + +GroupSuccess(group): + +- Let {values} be the list of {value} entries for each {task} targeted by {group}; tasks themselves are **not** included alongside the values, and ordering is intentionally unspecified. +- Remove {group} from {graph}, along with each child group of {group}, promoting each direct descendant group of {group} to a root group; let {maybeEmptyNewGroups} be the set of such promoted child groups. +- Let {newGroups} be the result of {PruneEmptyGroups(maybeEmptyNewGroups)}. +- For each {newGroup} of {newGroups}: + - For each {task} targeted by {newGroup}: + - Call {StartTask(task)}. +- Remove all {task} nodes targeted by {group} along with any associated {value} nodes, promoting each child stream of {task} to a root stream; let {newStreams} be the set of such promoted streams. +- If {values} is non-empty, yield {GROUP_VALUES: { group, values }} on {workEventStream}. +- Yield {GROUP_SUCCESS: { group, values, newGroups, newStreams }} on {workEventStream}. +- Call {StartWork(newGroups, newStreams)}. + +### Handling Task Failure + +TaskFailure handles the error path for a task by emitting {GROUP_FAILURE}. The payload reports the failed group and the propagated {error} so consumers can surface diagnostics. Because a task failure invalidates all dependent subgraphs, this procedure also tears down orphaned tasks, groups, and streams. + +TaskFailure(task, error): + +- If {task} is no longer in {graph}, return early. +- For each group {group} with {group → task}: + - Yield {GROUP_FAILURE: { group, error }} on {workEventStream}. + - Remove {group} from {graph} along with its descendant groups. +- Remove task nodes not targeted by any other group (tasks that were only targeted by the removed {group} or its removed descendants) and discard any associated {value} or {stream} nodes. Work associated with removed tasks and streams may be cancelled. + +### Handling Stream Items + +{StreamItems} moves incremental data from a running stream to the consumer. Each {STREAM_ITEMS} event reports the producing {stream}, the yielded batch of item {values}, and any {newGroups}/{newStreams} introduced once each item's {work} is integrated. This event is the only one that carries arbitrary payload data from user-defined streams; all others describe structural progress. + +StreamItems(stream, items): + +- Let {values}, {newGroups}, and {newStreams} start empty. +- For each {item} in {items}: + - Let {value} and {work} be the respective fields of {item}. + - If {work} exists: + - Let {maybeEmptyItemNewGroups} and {itemNewStreams} be the result of {IntegrateWork(work)}. + - Append the result of {PruneEmptyGroups(maybeEmptyItemNewGroups)} to {newGroups}. + - Append {itemNewStreams} to {newStreams}. + - Append {value} to {values}. +- Call {StartWork(newGroups, newStreams)}. +- Yield {STREAM_VALUES: { stream, values, newGroups, newStreams}} on {workEventStream}, where {newGroups} and {newStreams} denote the nodes that became roots as a result of integrating the batch's {work}. +- If the {stream} queue is already stopped after draining {items}, also yield {STREAM_SUCCESS} and remove {stream} from {graph}. + +### Handling Stream Completion + +{StreamSuccess} emits a terminal notification for streams, yielding {STREAM_SUCCESS} with the successful stream. + +StreamSuccess(stream): + +- Yield {STREAM_SUCCESS: { stream: {stream} }} on {workEventStream} +- Remove {stream} from {graph}. + +### Handling Stream Errors + +{StreamFailure} yields {STREAM_FAILURE} with both {stream} and the causal {error}. + +StreamFailure(stream, error): + +- Yield {STREAM_FAILURE: { stream: {stream}, error: {error} }}. +- Remove {stream} from {graph}. diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts deleted file mode 100644 index 786c956fa2..0000000000 --- a/src/execution/IncrementalGraph.ts +++ /dev/null @@ -1,315 +0,0 @@ -import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js'; -import { invariant } from '../jsutils/invariant.js'; -import { isPromise } from '../jsutils/isPromise.js'; -import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; - -import type { GraphQLError } from '../error/GraphQLError.js'; - -import { Queue } from './Queue.js'; -import type { - DeferredFragmentRecord, - DeliveryGroup, - IncrementalDataRecord, - IncrementalDataRecordResult, - PendingExecutionGroup, - StreamRecord, - SuccessfulExecutionGroup, -} from './types.js'; -import { isDeferredFragmentRecord, isPendingExecutionGroup } from './types.js'; - -/** - * @internal - */ -export class IncrementalGraph { - private _rootNodes: Set; - private _completed: AsyncGenerator; - // _push and _stop are assigned in the executor which is executed - // synchronously by the Queue constructor. - private _push!: (item: IncrementalDataRecordResult) => PromiseOrValue; - private _stop!: () => void; - - constructor( - reducer: ( - generator: Generator, - ) => U | undefined, - ) { - this._rootNodes = new Set(); - this._completed = new Queue( - ({ push, stop }) => { - this._push = push; - this._stop = stop; - }, - ).subscribe(reducer); - } - - getNewRootNodes( - newDeferredFragmentRecords: - | ReadonlyArray - | undefined, - incrementalDataRecords: ReadonlyArray, - ): ReadonlyArray { - const initialResultChildren = new Set(); - - if (newDeferredFragmentRecords !== undefined) { - for (const deferredFragmentRecord of newDeferredFragmentRecords) { - this._addDeferredFragment( - deferredFragmentRecord, - initialResultChildren, - ); - } - } - - this._addIncrementalDataRecords( - incrementalDataRecords, - undefined, - initialResultChildren, - ); - return this._promoteNonEmptyToRoot(initialResultChildren); - } - - addCompletedSuccessfulExecutionGroup( - successfulExecutionGroup: SuccessfulExecutionGroup, - ): void { - const { - pendingExecutionGroup, - newDeferredFragmentRecords, - incrementalDataRecords, - } = successfulExecutionGroup; - - if (newDeferredFragmentRecords !== undefined) { - for (const deferredFragmentRecord of newDeferredFragmentRecords) { - this._addDeferredFragment(deferredFragmentRecord, undefined); - } - } - - const deferredFragmentRecords = - pendingExecutionGroup.deferredFragmentRecords; - - for (const deferredFragmentRecord of deferredFragmentRecords) { - const { pendingExecutionGroups, successfulExecutionGroups } = - deferredFragmentRecord; - pendingExecutionGroups.delete(pendingExecutionGroup); - successfulExecutionGroups.add(successfulExecutionGroup); - } - - if (incrementalDataRecords !== undefined) { - this._addIncrementalDataRecords( - incrementalDataRecords, - deferredFragmentRecords, - ); - } - } - - hasNext(): boolean { - return this._rootNodes.size > 0; - } - - completeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord): - | { - newRootNodes: ReadonlyArray; - successfulExecutionGroups: ReadonlyArray; - } - | undefined { - if ( - !this._rootNodes.has(deferredFragmentRecord) || - deferredFragmentRecord.pendingExecutionGroups.size > 0 - ) { - return; - } - const successfulExecutionGroups = Array.from( - deferredFragmentRecord.successfulExecutionGroups, - ); - this._rootNodes.delete(deferredFragmentRecord); - for (const successfulExecutionGroup of successfulExecutionGroups) { - for (const otherDeferredFragmentRecord of successfulExecutionGroup - .pendingExecutionGroup.deferredFragmentRecords) { - otherDeferredFragmentRecord.successfulExecutionGroups.delete( - successfulExecutionGroup, - ); - } - } - const newRootNodes = this._promoteNonEmptyToRoot( - deferredFragmentRecord.children, - ); - this._maybeStop(); - return { newRootNodes, successfulExecutionGroups }; - } - - removeDeferredFragment( - deferredFragmentRecord: DeferredFragmentRecord, - ): boolean { - const deleted = this._rootNodes.delete(deferredFragmentRecord); - if (!deleted) { - return false; - } - this._maybeStop(); - return true; - } - - removeStream(streamRecord: StreamRecord): void { - this._rootNodes.delete(streamRecord); - this._maybeStop(); - } - - subscribe(): AsyncGenerator { - return this._completed; - } - - private _addIncrementalDataRecords( - incrementalDataRecords: ReadonlyArray, - parents: ReadonlyArray | undefined, - initialResultChildren?: Set, - ): void { - for (const incrementalDataRecord of incrementalDataRecords) { - if (isPendingExecutionGroup(incrementalDataRecord)) { - for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) { - deferredFragmentRecord.pendingExecutionGroups.add( - incrementalDataRecord, - ); - } - if (this._completesRootNode(incrementalDataRecord)) { - this._onExecutionGroup(incrementalDataRecord); - } - } else if (parents === undefined) { - invariant(initialResultChildren !== undefined); - initialResultChildren.add(incrementalDataRecord); - } else { - for (const parent of parents) { - parent.children.add(incrementalDataRecord); - } - } - } - } - - private _promoteNonEmptyToRoot( - maybeEmptyNewRootNodes: Set, - ): ReadonlyArray { - const newRootNodes: Array = []; - for (const node of maybeEmptyNewRootNodes) { - if (isDeferredFragmentRecord(node)) { - if (node.pendingExecutionGroups.size > 0) { - for (const pendingExecutionGroup of node.pendingExecutionGroups) { - if (!this._completesRootNode(pendingExecutionGroup)) { - this._onExecutionGroup(pendingExecutionGroup); - } - } - this._rootNodes.add(node); - newRootNodes.push(node); - continue; - } - for (const child of node.children) { - maybeEmptyNewRootNodes.add(child); - } - } else { - this._rootNodes.add(node); - newRootNodes.push(node); - - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this._onStreamItems(node); - } - } - return newRootNodes; - } - - private _completesRootNode( - pendingExecutionGroup: PendingExecutionGroup, - ): boolean { - return pendingExecutionGroup.deferredFragmentRecords.some( - (deferredFragmentRecord) => this._rootNodes.has(deferredFragmentRecord), - ); - } - - private _addDeferredFragment( - deferredFragmentRecord: DeferredFragmentRecord, - initialResultChildren: Set | undefined, - ): void { - const parent = deferredFragmentRecord.parent; - if (parent === undefined) { - invariant(initialResultChildren !== undefined); - initialResultChildren.add(deferredFragmentRecord); - return; - } - parent.children.add(deferredFragmentRecord); - } - - private _onExecutionGroup( - pendingExecutionGroup: PendingExecutionGroup, - ): void { - let completedExecutionGroup = pendingExecutionGroup.result; - if (!(completedExecutionGroup instanceof BoxedPromiseOrValue)) { - completedExecutionGroup = completedExecutionGroup(); - } - const value = completedExecutionGroup.value; - if (isPromise(value)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - value.then((resolved) => { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this._push(resolved); - }); - } else { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this._push(value); - } - } - - private async _onStreamItems(streamRecord: StreamRecord): Promise { - const streamItemQueue = streamRecord.streamItemQueue; - let closed = false; - try { - await streamItemQueue.forEachBatch((streamItemResults) => { - const items: Array = []; - const errors: Array = []; - const newDeferredFragmentRecords: Array = []; - const incrementalDataRecords: Array = []; - - for (const result of streamItemResults) { - items.push(result.item); - if (result.errors !== undefined) { - errors.push(...result.errors); - } - if (result.newDeferredFragmentRecords !== undefined) { - newDeferredFragmentRecords.push( - ...result.newDeferredFragmentRecords, - ); - } - if (result.incrementalDataRecords !== undefined) { - incrementalDataRecords.push(...result.incrementalDataRecords); - } - } - - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this._push({ - streamRecord, - result: - // TODO add additional test case or rework for coverage - errors.length > 0 /* c8 ignore start */ - ? { items, errors } /* c8 ignore stop */ - : { items }, - newDeferredFragmentRecords, - incrementalDataRecords, - }); - - if (streamItemQueue.isStopped()) { - closed = true; - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this._push({ streamRecord }); - } - }); - } catch (error) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this._push({ streamRecord, errors: [error] }); - return; - } - - if (!closed) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this._push({ streamRecord }); - } - } - - private _maybeStop(): void { - if (!this.hasNext()) { - this._stop(); - } - } -} diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 2fda046dff..42bf8f2edf 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -4,33 +4,29 @@ import { pathToArray } from '../jsutils/Path.js'; import type { GraphQLError } from '../error/GraphQLError.js'; import type { AbortSignalListener } from './AbortSignalListener.js'; -import { IncrementalGraph } from './IncrementalGraph.js'; +import { mapAsyncIterable } from './mapAsyncIterable.js'; import type { - CompletedExecutionGroup, CompletedResult, DeferredFragmentRecord, - DeliveryGroup, + ExecutionGroupValue, ExperimentalIncrementalExecutionResults, - IncrementalDataRecord, - IncrementalDataRecordResult, IncrementalDeferResult, IncrementalResult, - IncrementalStreamResult, + IncrementalWork, InitialIncrementalExecutionResult, PendingResult, - StreamItemsResult, + StreamItemValue, StreamRecord, SubsequentIncrementalExecutionResult, } from './types.js'; -import { isCompletedExecutionGroup, isFailedExecutionGroup } from './types.js'; import { withCleanup } from './withCleanup.js'; +import type { WorkQueueEvent } from './WorkQueue.js'; +import { createWorkQueue } from './WorkQueue.js'; -// eslint-disable-next-line max-params export function buildIncrementalResponse( result: ObjMap, errors: ReadonlyArray, - newDeferredFragmentRecords: ReadonlyArray | undefined, - incrementalDataRecords: ReadonlyArray, + work: IncrementalWork, earlyReturns: Map Promise>, abortSignalListener: AbortSignalListener | undefined, ): ExperimentalIncrementalExecutionResults { @@ -38,64 +34,56 @@ export function buildIncrementalResponse( earlyReturns, abortSignalListener, ); - return incrementalPublisher.buildResponse( - result, - errors, - newDeferredFragmentRecords, - incrementalDataRecords, - ); + return incrementalPublisher.buildResponse(result, errors, work); } interface SubsequentIncrementalExecutionResultContext { pending: Array; incremental: Array; completed: Array; + hasNext: boolean; } /** - * This class is used to publish incremental results to the client, enabling semi-concurrent - * execution while preserving result order. - * * @internal */ -class IncrementalPublisher { +export class IncrementalPublisher { + private _ids: Map; private _earlyReturns: Map Promise>; private _abortSignalListener: AbortSignalListener | undefined; private _nextId: number; - private _incrementalGraph: IncrementalGraph; constructor( earlyReturns: Map Promise>, abortSignalListener: AbortSignalListener | undefined, ) { + this._ids = new Map(); this._earlyReturns = earlyReturns; this._abortSignalListener = abortSignalListener; this._nextId = 0; - this._incrementalGraph = new IncrementalGraph((batch) => - this._handleCompletedBatch(batch), - ); } buildResponse( data: ObjMap, errors: ReadonlyArray, - newDeferredFragmentRecords: - | ReadonlyArray - | undefined, - incrementalDataRecords: ReadonlyArray, + work: IncrementalWork, ): ExperimentalIncrementalExecutionResults { - const newRootNodes = this._incrementalGraph.getNewRootNodes( - newDeferredFragmentRecords, - incrementalDataRecords, - ); + const { initialGroups, initialStreams, events } = createWorkQueue< + ExecutionGroupValue, + StreamItemValue, + DeferredFragmentRecord, + StreamRecord + >(work); - const pending = this._toPendingResults(newRootNodes); + const pending = this._toPendingResults(initialGroups, initialStreams); const initialResult: InitialIncrementalExecutionResult = errors.length ? { errors, data, pending, hasNext: true } : { data, pending, hasNext: true }; - const subsequentResults = this._incrementalGraph.subscribe(); + const subsequentResults = mapAsyncIterable(events, (batch) => + this._handleBatch(batch), + ); return { initialResult, @@ -106,169 +94,181 @@ class IncrementalPublisher { }; } - private _ensureId(deliveryGroup: DeliveryGroup): string { - return (deliveryGroup.id ??= String(this._nextId++)); + private _ensureId( + deferredFragmentOrStream: DeferredFragmentRecord | StreamRecord, + ): string { + let id = this._ids.get(deferredFragmentOrStream); + if (id !== undefined) { + return id; + } + id = String(this._nextId++); + this._ids.set(deferredFragmentOrStream, id); + return id; } private _toPendingResults( - newRootNodes: ReadonlyArray, + newGroups: ReadonlyArray, + newStreams: ReadonlyArray, ): Array { const pendingResults: Array = []; - for (const node of newRootNodes) { - const id = this._ensureId(node); - const pendingResult: PendingResult = { - id, - path: pathToArray(node.path), - }; - if (node.label !== undefined) { - pendingResult.label = node.label; + for (const collection of [newGroups, newStreams]) { + for (const node of collection) { + const id = this._ensureId(node); + const pendingResult: PendingResult = { + id, + path: pathToArray(node.path), + }; + if (node.label !== undefined) { + pendingResult.label = node.label; + } + pendingResults.push(pendingResult); } - pendingResults.push(pendingResult); } return pendingResults; } - private _handleCompletedBatch( - batch: Iterable, - ): SubsequentIncrementalExecutionResult | undefined { + private _handleBatch( + batch: ReadonlyArray< + WorkQueueEvent< + ExecutionGroupValue, + StreamItemValue, + DeferredFragmentRecord, + StreamRecord + > + >, + ): SubsequentIncrementalExecutionResult { const context: SubsequentIncrementalExecutionResultContext = { pending: [], incremental: [], completed: [], + hasNext: true, }; - for (const completedResult of batch) { - this._handleCompletedIncrementalData(completedResult, context); - } - - const { incremental, completed } = context; - if (incremental.length === 0 && completed.length === 0) { - return; + for (const event of batch) { + this._handleWorkQueueEvent(event, context); } - const hasNext = this._incrementalGraph.hasNext(); + const { incremental, completed, pending, hasNext } = context; - const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult = - { hasNext }; - const pending = context.pending; + const result: SubsequentIncrementalExecutionResult = { hasNext }; if (pending.length > 0) { - subsequentIncrementalExecutionResult.pending = pending; + result.pending = pending; } if (incremental.length > 0) { - subsequentIncrementalExecutionResult.incremental = incremental; + result.incremental = incremental; } if (completed.length > 0) { - subsequentIncrementalExecutionResult.completed = completed; + result.completed = completed; } - return subsequentIncrementalExecutionResult; + return result; } - private _handleCompletedIncrementalData( - completedIncrementalData: IncrementalDataRecordResult, + private _handleWorkQueueEvent( + event: WorkQueueEvent< + ExecutionGroupValue, + StreamItemValue, + DeferredFragmentRecord, + StreamRecord + >, context: SubsequentIncrementalExecutionResultContext, ): void { - if (isCompletedExecutionGroup(completedIncrementalData)) { - this._handleCompletedExecutionGroup(completedIncrementalData, context); - } else { - this._handleCompletedStreamItems(completedIncrementalData, context); - } - } - - private _handleCompletedExecutionGroup( - completedExecutionGroup: CompletedExecutionGroup, - context: SubsequentIncrementalExecutionResultContext, - ): void { - if (isFailedExecutionGroup(completedExecutionGroup)) { - for (const deferredFragmentRecord of completedExecutionGroup - .pendingExecutionGroup.deferredFragmentRecords) { - if ( - this._incrementalGraph.removeDeferredFragment(deferredFragmentRecord) - ) { - const id = this._ensureId(deferredFragmentRecord); - context.completed.push({ + switch (event.kind) { + case 'GROUP_VALUES': { + const group = event.group; + const id = this._ensureId(group); + for (const value of event.values) { + const { bestId, subPath } = this._getBestIdAndSubPath( id, - errors: completedExecutionGroup.errors, - }); + group, + value, + ); + const incrementalEntry: IncrementalDeferResult = { + id: bestId, + data: value.data, + }; + if (value.errors !== undefined) { + incrementalEntry.errors = value.errors; + } + if (subPath !== undefined) { + incrementalEntry.subPath = subPath; + } + context.incremental.push(incrementalEntry); } + break; } - return; - } - - this._incrementalGraph.addCompletedSuccessfulExecutionGroup( - completedExecutionGroup, - ); - - for (const deferredFragmentRecord of completedExecutionGroup - .pendingExecutionGroup.deferredFragmentRecords) { - const completion = this._incrementalGraph.completeDeferredFragment( - deferredFragmentRecord, - ); - if (completion === undefined) { - continue; + case 'GROUP_SUCCESS': { + const group = event.group; + const id = this._ensureId(group); + context.completed.push({ id }); + this._ids.delete(group); + if (event.newGroups.length > 0 || event.newStreams.length > 0) { + context.pending.push( + ...this._toPendingResults(event.newGroups, event.newStreams), + ); + } + break; } - const id = this._ensureId(deferredFragmentRecord); - const incremental = context.incremental; - const { newRootNodes, successfulExecutionGroups } = completion; - context.pending.push(...this._toPendingResults(newRootNodes)); - for (const successfulExecutionGroup of successfulExecutionGroups) { - const { bestId, subPath } = this._getBestIdAndSubPath( + case 'GROUP_FAILURE': { + const { group, error } = event; + const id = this._ensureId(group); + context.completed.push({ id, - deferredFragmentRecord, - successfulExecutionGroup, + errors: [error as GraphQLError], + }); + this._ids.delete(group); + break; + } + case 'STREAM_VALUES': { + const stream = event.stream; + const id = this._ensureId(stream); + const { values, newGroups, newStreams } = event; + const items: Array = []; + const errors: Array = []; + for (const value of values) { + items.push(value.item); + if (value.errors !== undefined) { + errors.push(...value.errors); + } + } + context.incremental.push( + errors.length > 0 ? { id, items, errors } : { id, items }, ); - const incrementalEntry: IncrementalDeferResult = { - ...successfulExecutionGroup.result, - id: bestId, - }; - if (subPath !== undefined) { - incrementalEntry.subPath = subPath; + if (newGroups.length > 0 || newStreams.length > 0) { + context.pending.push( + ...this._toPendingResults(newGroups, newStreams), + ); } - incremental.push(incrementalEntry); + break; } - context.completed.push({ id }); - } - } - - private _handleCompletedStreamItems( - streamItemsResult: StreamItemsResult, - context: SubsequentIncrementalExecutionResultContext, - ): void { - const streamRecord = streamItemsResult.streamRecord; - const id = this._ensureId(streamRecord); - if (streamItemsResult.errors !== undefined) { - context.completed.push({ - id, - errors: streamItemsResult.errors, - }); - this._incrementalGraph.removeStream(streamRecord); - const earlyReturn = this._earlyReturns.get(streamRecord); - if (earlyReturn !== undefined) { - earlyReturn().catch(() => { - /* c8 ignore next 1 */ - // ignore error + case 'STREAM_SUCCESS': { + const stream = event.stream; + context.completed.push({ + id: this._ensureId(stream), }); - this._earlyReturns.delete(streamRecord); + this._ids.delete(stream); + this._earlyReturns.delete(stream); + break; } - } else if (streamItemsResult.result === undefined) { - context.completed.push({ id }); - this._incrementalGraph.removeStream(streamRecord); - this._earlyReturns.delete(streamRecord); - } else { - const incrementalEntry: IncrementalStreamResult = { - id, - ...streamItemsResult.result, - }; - - context.incremental.push(incrementalEntry); - - const { newDeferredFragmentRecords, incrementalDataRecords } = - streamItemsResult; - if (incrementalDataRecords !== undefined) { - const newRootNodes = this._incrementalGraph.getNewRootNodes( - newDeferredFragmentRecords, - incrementalDataRecords, - ); - context.pending.push(...this._toPendingResults(newRootNodes)); + case 'STREAM_FAILURE': { + const stream = event.stream; + context.completed.push({ + id: this._ensureId(stream), + errors: [event.error as GraphQLError], + }); + this._ids.delete(stream); + const earlyReturn = this._earlyReturns.get(stream); + if (earlyReturn !== undefined) { + earlyReturn().catch(() => { + /* c8 ignore next 1 */ + // ignore error + }); + this._earlyReturns.delete(stream); + } + break; + } + case 'WORK_QUEUE_TERMINATION': { + context.hasNext = false; + break; } } } @@ -276,17 +276,16 @@ class IncrementalPublisher { private _getBestIdAndSubPath( initialId: string, initialDeferredFragmentRecord: DeferredFragmentRecord, - completedExecutionGroup: CompletedExecutionGroup, + executionGroupValue: ExecutionGroupValue, ): { bestId: string; subPath: ReadonlyArray | undefined } { let maxLength = pathToArray(initialDeferredFragmentRecord.path).length; let bestId = initialId; - for (const deferredFragmentRecord of completedExecutionGroup - .pendingExecutionGroup.deferredFragmentRecords) { + for (const deferredFragmentRecord of executionGroupValue.deferredFragmentRecords) { if (deferredFragmentRecord === initialDeferredFragmentRecord) { continue; } - const id = deferredFragmentRecord.id; + const id = this._ids.get(deferredFragmentRecord); // TODO: add test case for when an fragment has not been released, but might be processed for the shortest path. /* c8 ignore next 3 */ if (id === undefined) { @@ -299,7 +298,7 @@ class IncrementalPublisher { bestId = id; } } - const subPath = completedExecutionGroup.path.slice(maxLength); + const subPath = executionGroupValue.path.slice(maxLength); return { bestId, subPath: subPath.length > 0 ? subPath : undefined, diff --git a/src/execution/WorkQueue.ts b/src/execution/WorkQueue.ts new file mode 100644 index 0000000000..7907f0f107 --- /dev/null +++ b/src/execution/WorkQueue.ts @@ -0,0 +1,664 @@ +import { isPromise } from '../jsutils/isPromise.js'; +import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; + +import { Queue } from './Queue.js'; + +export interface Group> { + parent?: TSelf | undefined; +} + +interface WorkResult< + TValue, + T, + I, + G extends Group, + S extends Stream, +> { + value: TValue; + work?: Work | undefined; +} + +export interface Stream< + T, + I, + G extends Group, + S extends Stream, +> { + queue: Queue>; +} + +export interface Work, S extends Stream> { + groups?: ReadonlyArray; + tasks?: ReadonlyArray>; + streams?: ReadonlyArray; +} + +interface NewWork, S extends Stream> { + newGroups: ReadonlyArray; + newStreams: ReadonlyArray; +} + +export interface WorkQueue< + T, + I, + G extends Group, + S extends Stream, +> { + initialGroups: ReadonlyArray; + initialStreams: ReadonlyArray; + events: AsyncGenerator>>; +} + +export type StreamItem< + T, + I, + G extends Group, + S extends Stream, +> = WorkResult; + +export type TaskResult< + T, + I, + G extends Group, + S extends Stream, +> = WorkResult; + +type MaybePromise = + | { status: 'fulfilled'; value: T } + | { status: 'pending'; promise: Promise } + | { status: 'rejected'; reason: unknown }; + +/** @internal **/ +export class Task, S extends Stream> { + readonly groups: ReadonlyArray; + private _fn: () => PromiseOrValue>; + private _maybePromise?: MaybePromise>; + constructor( + fn: () => PromiseOrValue>, + groups: ReadonlyArray, + ) { + this._fn = fn; + this.groups = groups; + } + start(): MaybePromise> { + if (this._maybePromise) { + return this._maybePromise; + } + try { + const result = this._fn(); + if (isPromise(result)) { + this._maybePromise = { status: 'pending', promise: result }; + result.then( + (value) => { + this._maybePromise = { status: 'fulfilled', value }; + }, + (reason: unknown) => { + this._maybePromise = { status: 'rejected', reason }; + }, + ); + } else { + this._maybePromise = { status: 'fulfilled', value: result }; + } + } catch (reason: unknown) { + this._maybePromise = { status: 'rejected', reason }; + } + return this._maybePromise; + } + result(): PromiseOrValue> { + const maybePromise = this.start(); + switch (maybePromise.status) { + case 'fulfilled': + return maybePromise.value; + case 'rejected': + throw maybePromise.reason; + case 'pending': { + return maybePromise.promise; + } + } + } +} + +interface TaskSuccessGraphEvent< + T, + I, + G extends Group, + S extends Stream, +> { + kind: 'TASK_SUCCESS'; + task: Task; + result: TaskResult; +} + +interface TaskFailureGraphEvent< + T, + I, + G extends Group, + S extends Stream, +> { + kind: 'TASK_FAILURE'; + task: Task; + error: unknown; +} + +interface StreamItemsEvent< + T, + I, + G extends Group, + S extends Stream, +> { + kind: 'STREAM_ITEMS'; + stream: S; + items: Generator>; +} + +interface StreamSuccessEvent< + T, + I, + G extends Group, + S extends Stream, +> { + kind: 'STREAM_SUCCESS'; + stream: S; +} + +interface StreamFailureEvent< + T, + I, + G extends Group, + S extends Stream, +> { + kind: 'STREAM_FAILURE'; + stream: S; + error: unknown; +} + +type GraphEvent, S extends Stream> = + | TaskSuccessGraphEvent + | TaskFailureGraphEvent + | StreamItemsEvent + | StreamSuccessEvent + | StreamFailureEvent; + +interface GroupValuesEvent< + T, + I, + G extends Group, + S extends Stream, +> { + kind: 'GROUP_VALUES'; + group: G; + values: ReadonlyArray; +} + +interface GroupSuccessEvent< + T, + I, + G extends Group, + S extends Stream, +> extends NewWork { + kind: 'GROUP_SUCCESS'; + group: G; +} + +interface GroupFailureEvent> { + kind: 'GROUP_FAILURE'; + group: G; + error: unknown; +} + +interface StreamValuesEvent< + T, + I, + G extends Group, + S extends Stream, +> extends NewWork { + kind: 'STREAM_VALUES'; + stream: S; + values: ReadonlyArray; +} + +interface WorkQueueTerminationEvent { + kind: 'WORK_QUEUE_TERMINATION'; +} + +export type WorkQueueEvent< + T, + I, + G extends Group, + S extends Stream, +> = + | GroupValuesEvent + | GroupSuccessEvent + | GroupFailureEvent + | StreamValuesEvent + | StreamSuccessEvent + | StreamFailureEvent + | WorkQueueTerminationEvent; + +interface GroupNode, S extends Stream> { + childGroups: Array; + tasks: Set>; + pending: number; +} + +interface TaskNode, S extends Stream> { + value: T | undefined; + childStreams: Array; +} + +/** @internal */ +export function createWorkQueue< + T, + I, + G extends Group, + S extends Stream, +>(initialWork: Work | undefined): WorkQueue { + const rootGroups = new Set(); + const rootStreams = new Set(); + const groupNodes = new Map>(); + const taskNodes = new Map, TaskNode>(); + let pushGraphEvent!: (e: GraphEvent) => PromiseOrValue; + let stopGraphEvents!: (err?: unknown) => void; + + const { newGroups: initialRootGroups, newStreams: initialRootStreams } = + maybeIntegrateWork(initialWork); + const nonEmptyInitialRootGroups = pruneEmptyGroups(initialRootGroups); + + const events = new Queue>( + ({ push: _push, stop: _stop, started: _started }) => { + pushGraphEvent = _push; + stopGraphEvents = _stop; + // eslint-disable-next-line @typescript-eslint/no-floating-promises + _started.then(() => + startWork(nonEmptyInitialRootGroups, initialRootStreams), + ); + }, + 1, + ).subscribe((graphEvents) => handleGraphEvents(graphEvents)); + + return { + initialGroups: nonEmptyInitialRootGroups, + initialStreams: initialRootStreams, + events, + }; + + function maybeIntegrateWork( + work: Work | undefined, + parentTask?: Task, + ): NewWork { + if (!work) { + return { newGroups: [], newStreams: [] }; + } + const { groups, tasks, streams } = work; + const newGroups = groups ? addGroups(groups, parentTask) : []; + if (tasks) { + for (const task of tasks) { + addTask(task); + } + } + const newStreams = streams ? addStreams(streams, parentTask) : []; + return { newGroups, newStreams }; + } + + function addGroups( + originalGroups: ReadonlyArray, + parentTask?: Task, + ): Array { + const groupSet = new Set(originalGroups); + const visited = new Set(); + const newRootGroups: Array = []; + for (const group of originalGroups) { + addGroup(group, groupSet, newRootGroups, visited, parentTask); + } + return newRootGroups; + } + + function addGroup( + group: G, + groupSet: ReadonlySet, + newRootGroups: Array, + visited: Set, + parentTask?: Task, + ): void { + if (visited.has(group)) { + return; + } + visited.add(group); + const parent = group.parent; + if (parent !== undefined && groupSet.has(parent)) { + addGroup(parent, groupSet, newRootGroups, visited, parentTask); + } + + const groupNode: GroupNode = { + childGroups: [], + tasks: new Set(), + pending: 0, + }; + groupNodes.set(group, groupNode); + + if (parentTask === undefined && !parent) { + newRootGroups.push(group); + } else if (parent) { + groupNodes.get(parent)?.childGroups.push(group); + } + } + + function addTask(task: Task): void { + for (const group of task.groups) { + const groupNode = groupNodes.get(group); + if (groupNode) { + groupNode.tasks.add(task); + groupNode.pending++; + if (rootGroups.has(group)) { + startTask(task); + } + } + } + } + + function addStreams( + streams: ReadonlyArray, + parentTask?: Task, + ): ReadonlyArray { + if (!parentTask) { + return streams; + } + const taskNode = taskNodes.get(parentTask); + if (taskNode) { + taskNode.childStreams.push(...streams); + } + return []; + } + + function pruneEmptyGroups( + newGroups: ReadonlyArray, + nonEmptyNewGroups: Array = [], + ): ReadonlyArray { + for (const newGroup of newGroups) { + const newGroupState = groupNodes.get(newGroup); + if (newGroupState) { + if (newGroupState.pending === 0) { + groupNodes.delete(newGroup); + pruneEmptyGroups(newGroupState.childGroups, nonEmptyNewGroups); + } else { + nonEmptyNewGroups.push(newGroup); + } + } + } + return nonEmptyNewGroups; + } + + function startWork( + newGroups: ReadonlyArray, + newStreams: ReadonlyArray, + ): void { + for (const group of newGroups) { + startGroup(group); + } + for (const stream of newStreams) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + startStream(stream); + } + } + + function startGroup(group: G): void { + rootGroups.add(group); + const groupNode = groupNodes.get(group); + if (groupNode) { + for (const task of groupNode.tasks) { + startTask(task); + } + } + } + + function startTask(task: Task): void { + if (taskNodes.has(task)) { + return; + } + taskNodes.set(task, { + value: undefined, + childStreams: [], + }); + try { + const result = task.result(); + if (isPromise(result)) { + result.then( + (resolved) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + pushGraphEvent({ kind: 'TASK_SUCCESS', task, result: resolved }); + }, + (error: unknown) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + pushGraphEvent({ kind: 'TASK_FAILURE', task, error }); + }, + ); + } else { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + pushGraphEvent({ kind: 'TASK_SUCCESS', task, result }); + } + } catch (error) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + pushGraphEvent({ kind: 'TASK_FAILURE', task, error }); + } + } + + async function startStream(stream: S): Promise { + rootStreams.add(stream); + try { + await stream.queue.forEachBatch(async (items) => { + const pushed = pushGraphEvent({ + kind: 'STREAM_ITEMS', + stream, + items, + }); + if (isPromise(pushed)) { + await pushed; + } + }); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + pushGraphEvent({ kind: 'STREAM_SUCCESS', stream }); + } catch (error) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + pushGraphEvent({ kind: 'STREAM_FAILURE', stream, error }); + } + } + + function handleGraphEvents( + graphEvents: Generator>, + ): ReadonlyArray> | undefined { + const workQueueEvents: Array> = []; + for (const graphEvent of graphEvents) { + switch (graphEvent.kind) { + case 'TASK_SUCCESS': + workQueueEvents.push(...taskSuccess(graphEvent)); + break; + case 'TASK_FAILURE': + workQueueEvents.push(...taskFailure(graphEvent)); + break; + case 'STREAM_ITEMS': + workQueueEvents.push(...streamItems(graphEvent)); + break; + case 'STREAM_SUCCESS': + // check whether already deleted within streamItems() + if (rootStreams.has(graphEvent.stream)) { + rootStreams.delete(graphEvent.stream); + workQueueEvents.push(graphEvent); + } + break; + case 'STREAM_FAILURE': + rootStreams.delete(graphEvent.stream); + workQueueEvents.push(graphEvent); + break; + } + } + + if (rootGroups.size === 0 && rootStreams.size === 0) { + stopGraphEvents(); + workQueueEvents.push({ kind: 'WORK_QUEUE_TERMINATION' }); + } + + return workQueueEvents.length > 0 ? workQueueEvents : undefined; + } + + function taskSuccess( + graphEvent: TaskSuccessGraphEvent, + ): ReadonlyArray< + GroupValuesEvent | GroupSuccessEvent + > { + const { task, result } = graphEvent; + const { value, work } = result; + const taskNode = taskNodes.get(task); + if (taskNode) { + taskNode.value = value; + } + maybeIntegrateWork(work, task); + + const groupEvents: Array< + GroupValuesEvent | GroupSuccessEvent + > = []; + const newGroups: Array = []; + const newStreams: Array = []; + for (const group of task.groups) { + const groupNode = groupNodes.get(group); + if (groupNode) { + groupNode.pending--; + if (rootGroups.has(group) && groupNode.pending === 0) { + const { + groupValuesEvent, + groupSuccessEvent, + newGroups: childNewGroups, + newStreams: childNewStreams, + } = finishGroupSuccess(group, groupNode); + if (groupValuesEvent) { + groupEvents.push(groupValuesEvent); + } + groupEvents.push(groupSuccessEvent); + newGroups.push(...childNewGroups); + newStreams.push(...childNewStreams); + } + } + } + + startWork(newGroups, newStreams); + return groupEvents; + } + + function taskFailure( + graphEvent: TaskFailureGraphEvent, + ): ReadonlyArray> { + const { task, error } = graphEvent; + taskNodes.delete(task); + const groupFailureEvents: Array> = []; + for (const group of task.groups) { + const groupNode = groupNodes.get(group); + if (groupNode) { + groupFailureEvents.push(finishGroupFailure(group, groupNode, error)); + } + } + return groupFailureEvents; + } + + function streamItems( + graphEvent: StreamItemsEvent, + ): + | [StreamValuesEvent] + | [StreamValuesEvent, StreamSuccessEvent] { + const { stream, items } = graphEvent; + const values: Array = []; + const newGroups: Array = []; + const newStreams: Array = []; + for (const { value, work } of items) { + const { newGroups: itemNewGroups, newStreams: itemNewStreams } = + maybeIntegrateWork(work); + const nonEmptyNewGroups = pruneEmptyGroups(itemNewGroups); + startWork(nonEmptyNewGroups, itemNewStreams); + values.push(value); + newGroups.push(...nonEmptyNewGroups); + newStreams.push(...itemNewStreams); + } + const streamValuesEvent: StreamValuesEvent = { + kind: 'STREAM_VALUES', + stream, + values, + newGroups, + newStreams, + }; + + // queues allow peeking ahead see if stream has stopped + if (stream.queue.isStopped()) { + rootStreams.delete(stream); + return [streamValuesEvent, { kind: 'STREAM_SUCCESS', stream }]; + } + return [streamValuesEvent]; + } + + function finishGroupSuccess( + group: G, + groupNode: GroupNode, + ): { + groupValuesEvent: GroupValuesEvent | undefined; + groupSuccessEvent: GroupSuccessEvent; + newGroups: ReadonlyArray; + newStreams: ReadonlyArray; + } { + groupNodes.delete(group); + const values: Array = []; + const newStreams: Array = []; + for (const task of groupNode.tasks) { + const taskNode = taskNodes.get(task); + if (taskNode) { + const { value, childStreams } = taskNode; + if (value !== undefined) { + values.push(value); + } + for (const childStream of childStreams) { + newStreams.push(childStream); + } + removeTask(task); + } + } + const newGroups = pruneEmptyGroups(groupNode.childGroups); + rootGroups.delete(group); + return { + groupValuesEvent: values.length + ? { kind: 'GROUP_VALUES', group, values } + : undefined, + groupSuccessEvent: { + kind: 'GROUP_SUCCESS', + group, + newGroups, + newStreams, + }, + newGroups, + newStreams, + }; + } + + function finishGroupFailure( + group: G, + groupNode: GroupNode, + error: unknown, + ): GroupFailureEvent { + removeGroup(group, groupNode); + rootGroups.delete(group); + return { kind: 'GROUP_FAILURE', group, error }; + } + + function removeGroup(group: G, groupNode: GroupNode): void { + groupNodes.delete(group); + for (const childGroup of groupNode.childGroups) { + const childGroupState = groupNodes.get(childGroup); + if (childGroupState) { + removeGroup(childGroup, childGroupState); + } + } + } + + function removeTask(task: Task): void { + for (const group of task.groups) { + const groupNode = groupNodes.get(group); + groupNode?.tasks.delete(task); + } + taskNodes.delete(task); + } +} diff --git a/src/execution/__tests__/WorkQueue-test.ts b/src/execution/__tests__/WorkQueue-test.ts new file mode 100644 index 0000000000..8775549c40 --- /dev/null +++ b/src/execution/__tests__/WorkQueue-test.ts @@ -0,0 +1,1262 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import { expectPromise } from '../../__testUtils__/expectPromise.js'; +import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; + +import { isPromise } from '../../jsutils/isPromise.js'; +import type { PromiseOrValue } from '../../jsutils/PromiseOrValue.js'; + +import { Queue } from '../Queue.js'; +import type { + Group, + Stream, + StreamItem, + TaskResult, + Work, + WorkQueueEvent, +} from '../WorkQueue.js'; +import { createWorkQueue, Task } from '../WorkQueue.js'; + +type TestTaskValue = string | number; +type TestStreamItemValue = number; +type TestGroup = Group; +type TestStream = Stream< + TestTaskValue, + TestStreamItemValue, + TestGroup, + TestStream +>; +type TestStreamItem = StreamItem< + TestTaskValue, + TestStreamItemValue, + TestGroup, + TestStream +>; +type TestWork = Work; +type TestWorkQueueEvent = WorkQueueEvent< + TestTaskValue, + TestStreamItemValue, + TestGroup, + TestStream +>; + +async function collectWorkRun(work: TestWork): Promise<{ + events: Array; + initialGroups: ReadonlyArray; + initialStreams: ReadonlyArray; +}> { + const events: Array = []; + const workQueue = createWorkQueue< + TestTaskValue, + TestStreamItemValue, + TestGroup, + TestStream + >(work); + + for await (const batch of workQueue.events) { + events.push(...batch); + } + return { + ...workQueue, + events, + }; +} + +function makeTask( + groups: ReadonlyArray, + valueOrFactory: + | TestTaskValue + | (() => PromiseOrValue< + TaskResult + >), + work?: TestWork, +): Task { + if (typeof valueOrFactory === 'function') { + return new Task(valueOrFactory, groups); + } + return new Task(() => ({ value: valueOrFactory, work }), groups); +} + +const streamFailureError = new Error('stream failure'); + +function streamFrom( + items: ReadonlyArray, + options?: { + throwAfter?: boolean; + error?: Error; + initialCapacity?: number; + }, +): TestStream { + const { throwAfter, error, initialCapacity } = options ?? {}; + return { + queue: new Queue(async ({ push, stop }) => { + for (const item of items) { + const pushed = push(item); + if (isPromise(pushed)) { + // eslint-disable-next-line no-await-in-loop + await pushed; + } + } + if (throwAfter) { + throw error ?? streamFailureError; + } + stop(); + }, initialCapacity), + }; +} + +describe('Task', () => { + it('can return a result', () => { + const task = makeTask([], () => ({ value: 123 })); + + expect(task.result()).to.deep.equal({ value: 123 }); + }); + + it('can be started manually', () => { + const task = makeTask([], () => ({ value: 123 })); + + task.start(); + expect(task.result()).to.deep.equal({ value: 123 }); + }); + + it('only runs once when started multiple times', async () => { + let runCount = 0; + const task = makeTask([], () => { + runCount++; + return { value: 'done' }; + }); + + await Promise.all([task.start(), task.start(), task.start()]); + const results = [task.result(), task.result(), task.result()]; + + expect(results).to.deep.equal([ + { value: 'done' }, + { value: 'done' }, + { value: 'done' }, + ]); + expect(runCount).to.equal(1); + }); + + it('stores async result via result()', async () => { + let runCount = 0; + const task = makeTask([], async () => { + runCount++; + await resolveOnNextTick(); + return { value: 'done' }; + }); + + await Promise.all([task.start(), task.start(), task.start()]); + const results = await Promise.all([ + task.result(), + task.result(), + task.result(), + ]); + + expect(results).to.deep.equal([ + { value: 'done' }, + { value: 'done' }, + { value: 'done' }, + ]); + expect(runCount).to.equal(1); + }); + + it('stores sync error in result()', () => { + let runCount = 0; + const task = makeTask([], () => { + runCount++; + throw new Error('failure'); + }); + + expect(() => task.start()).to.not.throw(); + expect(() => task.result()).to.throw('failure'); + expect(() => task.result()).to.throw('failure'); + expect(runCount).to.equal(1); + }); + + it('stores async error in result()', async () => { + let runCount = 0; + const task = makeTask([], async () => { + runCount++; + await resolveOnNextTick(); + throw new Error('failure'); + }); + + expect(() => task.start()).to.not.throw(); + await expectPromise(task.result()).toRejectWith('failure'); + expect(() => task.result()).to.throw('failure'); + expect(runCount).to.equal(1); + }); +}); + +describe('WorkQueue', () => { + it('runs parent and child groups sequentially', async () => { + const root: TestGroup = { parent: undefined }; + const child: TestGroup = { parent: root }; + + let childRan = false; + const childTask = makeTask([child], () => { + childRan = true; + return { value: 'child' }; + }); + const rootTask = makeTask([root], 'root', { + groups: [child], + tasks: [childTask], + }); + + const workQueue = await collectWorkRun({ + groups: [root], + tasks: [rootTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [root], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: root, + values: ['root'], + }, + { + kind: 'GROUP_SUCCESS', + group: root, + newGroups: [child], + newStreams: [], + }, + { + kind: 'GROUP_VALUES', + group: child, + values: ['child'], + }, + { + kind: 'GROUP_SUCCESS', + group: child, + newGroups: [], + newStreams: [], + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + + expect(childRan).to.equal(true); + }); + + it('can handle child groups passed prior to parents', async () => { + const root: TestGroup = { parent: undefined }; + const child: TestGroup = { parent: root }; + + const childTask = makeTask([child], 'child', {}); + const rootTask = makeTask([root], 'root', {}); + + const workQueue = await collectWorkRun({ + groups: [child, root], + tasks: [rootTask, childTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [root], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: root, + values: ['root'], + }, + { + kind: 'GROUP_SUCCESS', + group: root, + newGroups: [child], + newStreams: [], + }, + { + kind: 'GROUP_VALUES', + group: child, + values: ['child'], + }, + { + kind: 'GROUP_SUCCESS', + group: child, + newGroups: [], + newStreams: [], + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('propagates task failures and skips descendant groups', async () => { + const root: TestGroup = { parent: undefined }; + const child: TestGroup = { parent: root }; + const grandchild: TestGroup = { parent: child }; + let grandchildRan = false; + let childFailed = false; + + const grandchildTask = makeTask([grandchild], () => { + grandchildRan = true; + return { value: 'grandchild' }; + }); + + const boom = new Error('boom'); + const failingChildTask = makeTask([child], () => { + childFailed = true; + throw boom; + }); + + const rootTask = makeTask([root], 'root', { + groups: [child, grandchild], + tasks: [failingChildTask, grandchildTask], + }); + + const workQueue = await collectWorkRun({ + groups: [root], + tasks: [rootTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [root], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: root, + values: ['root'], + }, + { + kind: 'GROUP_SUCCESS', + group: root, + newGroups: [child], + newStreams: [], + }, + { + kind: 'GROUP_FAILURE', + group: child, + error: boom, + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + + expect(grandchildRan).to.equal(false); + expect(childFailed).to.equal(true); + }); + + it('integrates work object returned by task', async () => { + const root: TestGroup = { parent: undefined }; + const child: TestGroup = { parent: root }; + + const childTask = makeTask([child], () => ({ value: 'child' })); + const rootTask = makeTask([root], () => ({ + value: 'root', + work: { groups: [child], tasks: [childTask] }, + })); + + const workQueue = await collectWorkRun({ + groups: [root], + tasks: [rootTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [root], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: root, + values: ['root'], + }, + { + kind: 'GROUP_SUCCESS', + group: root, + newGroups: [child], + newStreams: [], + }, + { + kind: 'GROUP_VALUES', + group: child, + values: ['child'], + }, + { + kind: 'GROUP_SUCCESS', + group: child, + newGroups: [], + newStreams: [], + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('purges shared tasks so sibling groups finish without re-running work', async () => { + const groupA: TestGroup = { parent: undefined }; + const groupB: TestGroup = { parent: undefined }; + + const sharedTask = makeTask([groupA, groupB], 'shared'); + + const workQueue = await collectWorkRun({ + groups: [groupA, groupB], + tasks: [sharedTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [groupA, groupB], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: groupA, + values: ['shared'], + }, + { + kind: 'GROUP_SUCCESS', + group: groupA, + newGroups: [], + newStreams: [], + }, + { + kind: 'GROUP_SUCCESS', + group: groupB, + newGroups: [], + newStreams: [], + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('ignores task-emitted groups without a valid parent', async () => { + const root: TestGroup = { parent: undefined }; + + const orphanGroup: TestGroup = { parent: undefined }; + const task = makeTask([root], 'root', { + groups: [orphanGroup], + }); + + const workQueue = await collectWorkRun({ + groups: [root], + tasks: [task], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [root], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: root, + values: ['root'], + }, + { + kind: 'GROUP_SUCCESS', + group: root, + newGroups: [], + newStreams: [], + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('skips child groups with only completed tasks when parent finishes later', async () => { + const parent1: TestGroup = { parent: undefined }; + const parent2: TestGroup = { parent: undefined }; + const child1: TestGroup = { parent: parent1 }; + const child2: TestGroup = { parent: parent2 }; + + const parent1Task = makeTask([parent1], 'parent1'); + const slowParent2Task = makeTask([parent2], async () => { + await resolveOnNextTick(); + return { value: 'parent2-slow' }; + }); + const sharedChildTask = makeTask([child1, child2], 'child-shared'); + const slowChild1FollowUpTask = makeTask([child1], async () => { + await resolveOnNextTick(); + await resolveOnNextTick(); + await resolveOnNextTick(); + return { value: 'child1-slow' }; + }); + + const workQueue = await collectWorkRun({ + groups: [parent1, parent2, child1, child2], + tasks: [ + parent1Task, + slowParent2Task, + sharedChildTask, + slowChild1FollowUpTask, + ], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [parent1, parent2], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: parent1, + values: ['parent1'], + }, + { + kind: 'GROUP_SUCCESS', + group: parent1, + newGroups: [child1], + newStreams: [], + }, + { + kind: 'GROUP_VALUES', + group: parent2, + values: ['parent2-slow'], + }, + { + kind: 'GROUP_SUCCESS', + group: parent2, + newGroups: [], + newStreams: [], + }, + { + kind: 'GROUP_VALUES', + group: child2, + values: ['child-shared', 'child1-slow'], + }, + { + kind: 'GROUP_SUCCESS', + group: child2, + newGroups: [], + newStreams: [], + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('skips promoted child groups that already completed shared tasks', async () => { + const parent: TestGroup = { parent: undefined }; + const child: TestGroup = { parent }; + + const parentTask = makeTask([parent], async () => { + await resolveOnNextTick(); + await resolveOnNextTick(); + return { value: 'parent' }; + }); + + const sharedTask = makeTask([parent, child], 'shared'); + + const workQueue = await collectWorkRun({ + groups: [parent, child], + tasks: [parentTask, sharedTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [parent], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: parent, + values: ['parent', 'shared'], + }, + { + kind: 'GROUP_SUCCESS', + group: parent, + newGroups: [], + newStreams: [], + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('skips child groups with shared tasks completed by a parent', async () => { + const parent: TestGroup = { parent: undefined }; + const child: TestGroup = { parent }; + const otherRoot: TestGroup = { parent: undefined }; + + const parentTask = makeTask([parent], async () => { + await resolveOnNextTick(); + await resolveOnNextTick(); + return { value: 'parent' }; + }); + + const sharedTask = makeTask([child, otherRoot], 'shared'); + const slowOtherRootTask = makeTask([otherRoot], async () => { + await resolveOnNextTick(); + await resolveOnNextTick(); + await resolveOnNextTick(); + await resolveOnNextTick(); + return { value: 'other-root' }; + }); + + const workQueue = await collectWorkRun({ + groups: [parent, otherRoot, child], + tasks: [parentTask, sharedTask, slowOtherRootTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [parent, otherRoot], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: parent, + values: ['parent'], + }, + { + kind: 'GROUP_SUCCESS', + group: parent, + newGroups: [], + newStreams: [], + }, + { + kind: 'GROUP_VALUES', + group: otherRoot, + values: ['shared', 'other-root'], + }, + { + kind: 'GROUP_SUCCESS', + group: otherRoot, + newGroups: [], + newStreams: [], + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('does not promote child groups that only share work with the parent', async () => { + const parent: TestGroup = { parent: undefined }; + const child: TestGroup = { parent }; + + const sharedTask = makeTask([parent, child], 'shared'); + const parentOnlyTask = makeTask([parent], async () => { + await resolveOnNextTick(); + return { value: 'parent-only' }; + }); + + const workQueue = await collectWorkRun({ + groups: [parent, child], + tasks: [sharedTask, parentOnlyTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [parent], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: parent, + values: ['shared', 'parent-only'], + }, + { + kind: 'GROUP_SUCCESS', + group: parent, + newGroups: [], + newStreams: [], + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('ignores work returned by tasks whose groups already failed', async () => { + const group: TestGroup = { parent: undefined }; + const lateGroup: TestGroup = { parent: undefined }; + + const failing = makeTask([group], () => { + throw new Error('fail early'); + }); + + const slow = makeTask([group], async () => { + await resolveOnNextTick(); + return { value: 'late', work: { groups: [lateGroup] } }; + }); + + const workQueue = await collectWorkRun({ + groups: [group], + tasks: [failing, slow], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [group], + initialStreams: [], + events: [ + { + kind: 'GROUP_FAILURE', + group, + error: new Error('fail early'), + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('defers task-emitted streams until the parent group succeeds', async () => { + const parent: TestGroup = { parent: undefined }; + const deferredStream = streamFrom([{ value: 7 }]); + + const parentTask = makeTask([parent], 'parent', { + streams: [deferredStream], + }); + + const workQueue = await collectWorkRun({ + groups: [parent], + tasks: [parentTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [parent], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: parent, + values: ['parent'], + }, + { + kind: 'GROUP_SUCCESS', + group: parent, + newStreams: [deferredStream], + newGroups: [], + }, + { + kind: 'STREAM_VALUES', + stream: deferredStream, + values: [7], + newGroups: [], + newStreams: [], + }, + { kind: 'STREAM_SUCCESS', stream: deferredStream }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('only promotes shared streams after all parent groups finish', async () => { + const groupA: TestGroup = { parent: undefined }; + const groupB: TestGroup = { parent: undefined }; + const sharedStream = streamFrom([{ value: 1 }]); + + const sharedTask = makeTask([groupA, groupB], 'shared', { + streams: [sharedStream], + }); + + const workQueue = await collectWorkRun({ + groups: [groupA, groupB], + tasks: [sharedTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [groupA, groupB], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: groupA, + values: ['shared'], + }, + { + kind: 'GROUP_SUCCESS', + group: groupA, + newStreams: [sharedStream], + newGroups: [], + }, + { + kind: 'GROUP_SUCCESS', + group: groupB, + newGroups: [], + newStreams: [], + }, + { + kind: 'STREAM_VALUES', + stream: sharedStream, + values: [1], + newGroups: [], + newStreams: [], + }, + { kind: 'STREAM_SUCCESS', stream: sharedStream }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('starts a shared stream once even when the second parent is slower', async () => { + const groupA: TestGroup = { parent: undefined }; + const groupB: TestGroup = { parent: undefined }; + const sharedStream = streamFrom([{ value: 5 }]); + + const sharedTask = makeTask([groupA, groupB], 'shared', { + streams: [sharedStream], + }); + const fastTaskA = makeTask([groupA], 'A-only'); + + const slowTaskB = makeTask([groupB], async () => { + await new Promise((resolve) => setTimeout(resolve, 0)); + return { value: 'B-slow' }; + }); + + const workQueue = await collectWorkRun({ + groups: [groupA, groupB], + tasks: [sharedTask, fastTaskA, slowTaskB], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [groupA, groupB], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: groupA, + values: ['shared', 'A-only'], + }, + { + kind: 'GROUP_SUCCESS', + group: groupA, + newStreams: [sharedStream], + newGroups: [], + }, + { + kind: 'STREAM_VALUES', + stream: sharedStream, + values: [5], + newGroups: [], + newStreams: [], + }, + { kind: 'STREAM_SUCCESS', stream: sharedStream }, + { + kind: 'GROUP_VALUES', + group: groupB, + values: ['B-slow'], + }, + { + kind: 'GROUP_SUCCESS', + group: groupB, + newGroups: [], + newStreams: [], + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('does not promote a child stream if the parent fails', async () => { + const group: TestGroup = { parent: undefined }; + const stream = streamFrom([{ value: 99 }]); + + const task = makeTask([group], 'task', { + streams: [stream], + }); + const boom = new Error('boom'); + const failingTask = makeTask([group], () => { + throw boom; + }); + + const workQueue = await collectWorkRun({ + groups: [group], + tasks: [task, failingTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [group], + initialStreams: [], + events: [ + { + kind: 'GROUP_FAILURE', + group, + error: boom, + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('promotes a stream with multiple parents when only a single parent fails', async () => { + const groupA: TestGroup = { parent: undefined }; + const groupB: TestGroup = { parent: undefined }; + const sharedStream = streamFrom([{ value: 99 }]); + + const sharedTask = makeTask([groupA, groupB], 'shared', { + streams: [sharedStream], + }); + const boom = new Error('boom'); + const failingTask = makeTask([groupA], () => { + throw boom; + }); + const slowTaskB = makeTask([groupB], async () => { + await resolveOnNextTick(); + return { value: 'B-resolved' }; + }); + + const workQueue = await collectWorkRun({ + groups: [groupA, groupB], + tasks: [sharedTask, failingTask, slowTaskB], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [groupA, groupB], + initialStreams: [], + events: [ + { + kind: 'GROUP_FAILURE', + group: groupA, + error: boom, + }, + { + kind: 'GROUP_VALUES', + group: groupB, + values: ['shared', 'B-resolved'], + }, + { + kind: 'GROUP_SUCCESS', + group: groupB, + newStreams: [sharedStream], + newGroups: [], + }, + { + kind: 'STREAM_VALUES', + stream: sharedStream, + values: [99], + newGroups: [], + newStreams: [], + }, + { + kind: 'STREAM_SUCCESS', + stream: sharedStream, + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('emits stream items followed by success', async () => { + const stream = streamFrom([{ value: 1 }, { value: 2 }, { value: 3 }]); + + const workQueue = await collectWorkRun({ streams: [stream] }); + + expect(workQueue).to.deep.equal({ + initialGroups: [], + initialStreams: [stream], + events: [ + { + kind: 'STREAM_VALUES', + stream, + values: [1], + newGroups: [], + newStreams: [], + }, + { + kind: 'STREAM_VALUES', + stream, + values: [2], + newGroups: [], + newStreams: [], + }, + { + kind: 'STREAM_VALUES', + stream, + values: [3], + newGroups: [], + newStreams: [], + }, + { kind: 'STREAM_SUCCESS', stream }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('handles batched stream items', async () => { + const spawned: TestGroup = { parent: undefined }; + const spawnedTask = makeTask([spawned], 'spawned-from-stream'); + + const stream = streamFrom( + [ + { value: 1 }, + { + value: 2, + work: { groups: [spawned], tasks: [spawnedTask] }, + }, + ], + { initialCapacity: 2 }, + ); + + const workQueue = await collectWorkRun({ streams: [stream] }); + + expect(workQueue).to.deep.equal({ + initialGroups: [], + initialStreams: [stream], + events: [ + { + kind: 'STREAM_VALUES', + stream, + values: [1, 2], + newGroups: [spawned], + newStreams: [], + }, + { + kind: 'GROUP_VALUES', + group: spawned, + values: ['spawned-from-stream'], + }, + { + kind: 'GROUP_SUCCESS', + group: spawned, + newGroups: [], + newStreams: [], + }, + { kind: 'STREAM_SUCCESS', stream }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('emits stream failure when the iterator throws', async () => { + const brokenStreamError = new Error('broken stream'); + const failingStream = streamFrom([{ value: 42 }], { + throwAfter: true, + error: brokenStreamError, + }); + + const workQueue = await collectWorkRun({ streams: [failingStream] }); + + expect(workQueue).to.deep.equal({ + initialGroups: [], + initialStreams: [failingStream], + events: [ + { + kind: 'STREAM_VALUES', + stream: failingStream, + values: [42], + newGroups: [], + newStreams: [], + }, + { + kind: 'STREAM_FAILURE', + stream: failingStream, + error: brokenStreamError, + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('emits stream success in a later payload when stream is slow to stop', async () => { + const stream: TestStream = { + queue: new Queue(async ({ push, stop }) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + push({ value: 1 }); + await new Promise((resolve) => setTimeout(resolve, 0)); + stop(); + }, 1), + }; + + const workQueue = await collectWorkRun({ + streams: [stream], + }); + + expect(workQueue.events).to.deep.equal([ + { + kind: 'STREAM_VALUES', + stream, + values: [1], + newGroups: [], + newStreams: [], + }, + { kind: 'STREAM_SUCCESS', stream }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ]); + }); + + it('emits late root groups and streams immediately when triggered from stream', async () => { + const initial: TestGroup = { parent: undefined }; + const lateGroup: TestGroup = { parent: undefined }; + + const lateTask = makeTask([lateGroup], 'late'); + const secondaryStream = streamFrom([{ value: 9 }]); + const triggerStream = streamFrom([ + { + value: 0, + work: { + groups: [lateGroup], + tasks: [lateTask], + streams: [secondaryStream], + }, + }, + ]); + + const initialTask = makeTask([initial], 'initial'); + + const workQueue = await collectWorkRun({ + groups: [initial], + tasks: [initialTask], + streams: [triggerStream], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [initial], + initialStreams: [triggerStream], + events: [ + { + kind: 'GROUP_VALUES', + group: initial, + values: ['initial'], + }, + { + kind: 'GROUP_SUCCESS', + group: initial, + newGroups: [], + newStreams: [], + }, + { + kind: 'STREAM_VALUES', + stream: triggerStream, + values: [0], + newGroups: [lateGroup], + newStreams: [secondaryStream], + }, + { + kind: 'GROUP_VALUES', + group: lateGroup, + values: ['late'], + }, + { + kind: 'GROUP_SUCCESS', + group: lateGroup, + newGroups: [], + newStreams: [], + }, + { + kind: 'STREAM_VALUES', + stream: secondaryStream, + values: [9], + newGroups: [], + newStreams: [], + }, + { kind: 'STREAM_SUCCESS', stream: triggerStream }, + { kind: 'STREAM_SUCCESS', stream: secondaryStream }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + it('handles tasks that are started manually before they complete', async () => { + const group: TestGroup = { parent: undefined }; + const primedTask = makeTask([group], async () => { + await resolveOnNextTick(); + return { value: 'primed' }; + }); + + primedTask.start(); + + const workQueue = await collectWorkRun({ + groups: [group], + tasks: [primedTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [group], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group, + values: ['primed'], + }, + { + kind: 'GROUP_SUCCESS', + group, + newGroups: [], + newStreams: [], + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('propagates failures for tasks started manually', async () => { + const group: TestGroup = { parent: undefined }; + const primedFailure = new Error('primed failure'); + const primedTask = makeTask([group], async () => { + await resolveOnNextTick(); + throw primedFailure; + }); + + primedTask.start(); + + const workQueue = await collectWorkRun({ + groups: [group], + tasks: [primedTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [group], + initialStreams: [], + events: [ + { + kind: 'GROUP_FAILURE', + group, + error: primedFailure, + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('skips groups with no tasks and promotes descendants', async () => { + const root: TestGroup = { parent: undefined }; + const emptyParent: TestGroup = { parent: root }; + const leaf: TestGroup = { parent: emptyParent }; + + const leafTask = makeTask([leaf], 'leaf'); + + const workQueue = await collectWorkRun({ + groups: [root, emptyParent, leaf], + tasks: [leafTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [leaf], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group: leaf, + values: ['leaf'], + }, + { + kind: 'GROUP_SUCCESS', + group: leaf, + newGroups: [], + newStreams: [], + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); + + it('handles tasks that are already settled before being queued', async () => { + const group: TestGroup = { parent: undefined }; + const eagerTask = makeTask([group], 'eager'); + + eagerTask.start(); + + const workQueue = await collectWorkRun({ + groups: [group], + tasks: [eagerTask], + }); + + expect(workQueue).to.deep.equal({ + initialGroups: [group], + initialStreams: [], + events: [ + { + kind: 'GROUP_VALUES', + group, + values: ['eager'], + }, + { + kind: 'GROUP_SUCCESS', + group, + newGroups: [], + newStreams: [], + }, + { kind: 'WORK_QUEUE_TERMINATION' }, + ], + }); + }); +}); diff --git a/src/execution/__tests__/defer-test.ts b/src/execution/__tests__/defer-test.ts index 8b2071a87a..2ea150b28c 100644 --- a/src/execution/__tests__/defer-test.ts +++ b/src/execution/__tests__/defer-test.ts @@ -1766,6 +1766,80 @@ describe('Execute: defer directive', () => { ]); }); + it('Handles cancelling child deferred fragments if parent fragment fails', async () => { + const document = parse(` + query { + ... @defer { + a { + someField + b { + c { + nonNullErrorField + } + } + } + ... @defer { + a { + someField + } + } + } + a { + ... @defer { + b { + c { + d + } + } + } + } + } + `); + const result = await complete(document, { + a: { b: { c: { d: 'd' } }, someField: 'someField' }, + }); + expectJSON(result).toDeepEqual([ + { + data: { + a: {}, + }, + pending: [ + { id: '0', path: ['a'] }, + { id: '1', path: [] }, + ], + hasNext: true, + }, + { + incremental: [ + { + data: { b: { c: {} } }, + id: '0', + }, + { + data: { d: 'd' }, + id: '0', + subPath: ['b', 'c'], + }, + ], + completed: [ + { + id: '1', + errors: [ + { + message: + 'Cannot return null for non-nullable field c.nonNullErrorField.', + locations: [{ line: 8, column: 17 }], + path: ['a', 'b', 'c', 'nonNullErrorField'], + }, + ], + }, + { id: '0' }, + ], + hasNext: false, + }, + ]); + }); + it('Handles multiple erroring deferred grouped field sets', async () => { const document = parse(` query { diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 305dd74d43..24a5df4fc0 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -369,16 +369,10 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [{ name: 'Han', id: '2' }], - id: '0', - }, - ], - hasNext: true, - }, - { - incremental: [ - { - items: [{ name: 'Leia', id: '3' }], + items: [ + { name: 'Han', id: '2' }, + { name: 'Leia', id: '3' }, + ], id: '0', }, ], @@ -682,22 +676,13 @@ describe('Execute: stream directive', () => { { incremental: [ { - items: [{ name: 'Han', id: '2' }], - id: '0', - }, - ], - hasNext: true, - }, - { - incremental: [ - { - items: [{ name: 'Leia', id: '3' }], + items: [ + { name: 'Han', id: '2' }, + { name: 'Leia', id: '3' }, + ], id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -737,9 +722,6 @@ describe('Execute: stream directive', () => { id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -833,9 +815,6 @@ describe('Execute: stream directive', () => { id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -903,7 +882,7 @@ describe('Execute: stream directive', () => { } } `); - const result = await completeAsync(document, 3, { + const result = await completeAsync(document, 2, { async *friendList() { yield await Promise.resolve(friends[0]); yield await Promise.resolve(friends[1]); @@ -933,12 +912,6 @@ describe('Execute: stream directive', () => { id: '0', }, ], - hasNext: true, - }, - }, - { - done: false, - value: { completed: [{ id: '0' }], hasNext: false, }, @@ -1369,9 +1342,6 @@ describe('Execute: stream directive', () => { id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -1674,11 +1644,7 @@ describe('Execute: stream directive', () => { ], }, ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - completed: [{ id: '1' }], + completed: [{ id: '1' }, { id: '0' }], hasNext: false, }, ]); @@ -1784,9 +1750,6 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -1939,9 +1902,6 @@ describe('Execute: stream directive', () => { id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, @@ -1999,14 +1959,110 @@ describe('Execute: stream directive', () => { id: '0', }, ], - hasNext: true, - }, - { completed: [{ id: '0' }], hasNext: false, }, ]); }); + it('Re-promotes a completed stream when a slower sibling defer resolves later', async () => { + const { promise: slowFieldPromise, resolve: resolveSlowField } = + promiseWithResolvers(); + const document = parse(` + query { + nestedObject { + ... @defer { + nestedFriendList @stream { name } + } + ... @defer { + scalarField + nestedFriendList @stream { name } + } + } + } + `); + const executeResult = await experimentalExecuteIncrementally({ + schema, + document, + rootValue: { + nestedObject: { + nestedFriendList: () => friends, + scalarField: slowFieldPromise, + }, + }, + }); + assert('initialResult' in executeResult); + const iterator = executeResult.subsequentResults[Symbol.asyncIterator](); + + const result1 = executeResult.initialResult; + expectJSON(result1).toDeepEqual({ + data: { + nestedObject: {}, + }, + pending: [ + { id: '0', path: ['nestedObject'] }, + { id: '1', path: ['nestedObject'] }, + ], + hasNext: true, + }); + + const result2 = await iterator.next(); + expectJSON(result2).toDeepEqual({ + value: { + pending: [{ id: '2', path: ['nestedObject', 'nestedFriendList'] }], + incremental: [ + { + data: { + nestedFriendList: [], + }, + id: '0', + }, + ], + completed: [{ id: '0' }], + hasNext: true, + }, + done: false, + }); + + resolveSlowField('slow'); + + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ + value: { + incremental: [ + { + items: [{ name: 'Luke' }, { name: 'Han' }, { name: 'Leia' }], + id: '2', + }, + ], + completed: [{ id: '2' }], + hasNext: true, + }, + done: false, + }); + + const result4 = await iterator.next(); + expectJSON(result4).toDeepEqual({ + value: { + incremental: [ + { + data: { + scalarField: 'slow', + }, + id: '1', + }, + ], + completed: [{ id: '1' }], + hasNext: false, + }, + done: false, + }); + + const result5 = await iterator.next(); + expectJSON(result5).toDeepEqual({ + done: true, + value: undefined, + }); + }); it('Returns payloads in correct order when parent deferred fragment resolves slower than stream', async () => { const { promise: slowFieldPromise, resolve: resolveSlowField } = promiseWithResolvers(); @@ -2071,39 +2127,18 @@ describe('Execute: stream directive', () => { value: { incremental: [ { - items: [{ name: 'Luke' }], + items: [{ name: 'Luke' }, { name: 'Han' }], id: '1', }, ], - hasNext: true, + completed: [{ id: '1' }], + hasNext: false, }, done: false, }); const result4 = await iterator.next(); expectJSON(result4).toDeepEqual({ - value: { - incremental: [ - { - items: [{ name: 'Han' }], - id: '1', - }, - ], - hasNext: true, - }, - done: false, - }); - - const result5 = await iterator.next(); - expectJSON(result5).toDeepEqual({ - value: { - completed: [{ id: '1' }], - hasNext: false, - }, - done: false, - }); - const result6 = await iterator.next(); - expectJSON(result6).toDeepEqual({ value: undefined, done: true, }); @@ -2188,20 +2223,13 @@ describe('Execute: stream directive', () => { id: '0', }, ], + completed: [{ id: '0' }], hasNext: true, }, done: false, }); const result4 = await iterator.next(); expectJSON(result4).toDeepEqual({ - value: { - completed: [{ id: '0' }], - hasNext: true, - }, - done: false, - }); - const result5 = await iterator.next(); - expectJSON(result5).toDeepEqual({ value: { incremental: [ { @@ -2214,8 +2242,8 @@ describe('Execute: stream directive', () => { }, done: false, }); - const result6 = await iterator.next(); - expectJSON(result6).toDeepEqual({ + const result5 = await iterator.next(); + expectJSON(result5).toDeepEqual({ value: undefined, done: true, }); @@ -2557,7 +2585,7 @@ describe('Execute: stream directive', () => { value: { incremental: [ { - items: [{ id: '1' }], + items: [{ id: '1' }, { id: '2' }], id: '0', }, ], @@ -2568,4 +2596,98 @@ describe('Execute: stream directive', () => { assert(returned); }); + it('limits stream batches to the configured capacity (10)', async () => { + const document = parse(` + query { + friendList @stream { + id + } + } + `); + + const executeResult = await experimentalExecuteIncrementally({ + schema, + document, + rootValue: { + async *friendList() { + for (let i = 0; i < 18; i++) { + // eslint-disable-next-line no-await-in-loop + yield await Promise.resolve(friends[i % 3]); + } + }, + }, + enableEarlyExecution: true, + options: { streamQueueCapacity: 10 }, + }); + assert('initialResult' in executeResult); + const iterator = executeResult.subsequentResults[Symbol.asyncIterator](); + + const result1 = executeResult.initialResult; + expectJSON(result1).toDeepEqual({ + data: { + friendList: [], + }, + pending: [{ id: '0', path: ['friendList'] }], + hasNext: true, + }); + + await new Promise((resolve) => setTimeout(resolve, 5)); + + const result2 = await iterator.next(); + expectJSON(result2).toDeepEqual({ + done: false, + value: { + incremental: [ + { + items: [ + { id: '1' }, + { id: '2' }, + { id: '3' }, + { id: '1' }, + { id: '2' }, + { id: '3' }, + { id: '1' }, + { id: '2' }, + { id: '3' }, + { id: '1' }, + ], + id: '0', + }, + ], + hasNext: true, + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 5)); + + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ + done: false, + value: { + incremental: [ + { + items: [ + { id: '2' }, + { id: '3' }, + { id: '1' }, + { id: '2' }, + { id: '3' }, + { id: '1' }, + { id: '2' }, + { id: '3' }, + ], + id: '0', + }, + ], + completed: [{ id: '0' }], + hasNext: false, + }, + }); + + const result4 = await iterator.next(); + expectJSON(result4).toDeepEqual({ + done: true, + value: undefined, + }); + }); }); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index cab7dc4a36..8953f7e4ed 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -1,4 +1,3 @@ -import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js'; import { inspect } from '../jsutils/inspect.js'; import { invariant } from '../jsutils/invariant.js'; import { isAsyncIterable } from '../jsutils/isAsyncIterable.js'; @@ -73,15 +72,17 @@ import { buildIncrementalResponse } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; import { Queue } from './Queue.js'; import type { - CompletedExecutionGroup, + DeferredFragmentRecord, + ExecutionGroup, + ExecutionGroupResult, ExecutionResult, ExperimentalIncrementalExecutionResults, IncrementalDataRecord, - PendingExecutionGroup, + IncrementalWork, StreamItemResult, StreamRecord, } from './types.js'; -import { DeferredFragmentRecord } from './types.js'; +import { isExecutionGroup } from './types.js'; import type { VariableValues } from './values.js'; import { getArgumentValues, @@ -89,6 +90,7 @@ import { getVariableValues, } from './values.js'; import { withCleanup } from './withCleanup.js'; +import { Task } from './WorkQueue.js'; /* eslint-disable max-params */ // This file contains a lot of such errors but we plan to refactor it anyway @@ -429,11 +431,14 @@ function buildDataResponse( return errors.length ? { errors, data } : { data }; } + const work = buildWorkFromIncrementalPayload( + newDeferredFragmentRecords, + incrementalDataRecords, + ); return buildIncrementalResponse( data, errors, - newDeferredFragmentRecords, - incrementalDataRecords, + work, (exeContext.earlyReturns ??= new Map()), exeContext.abortSignalListener, ); @@ -615,7 +620,7 @@ function executeRootExecutionPlan( ); if (newGroupedFieldSets.size > 0) { - const newPendingExecutionGroups = collectExecutionGroups( + const newExecutionGroups = collectExecutionGroups( exeContext, rootType, rootValue, @@ -625,10 +630,7 @@ function executeRootExecutionPlan( newDeferMap, ); - return withNewExecutionGroups( - graphqlWrappedResult, - newPendingExecutionGroups, - ); + return withNewExecutionGroups(graphqlWrappedResult, newExecutionGroups); } return graphqlWrappedResult; } @@ -663,16 +665,16 @@ function addNewDeferredFragmentRecords( function withNewExecutionGroups( result: PromiseOrValue>>, - newPendingExecutionGroups: ReadonlyArray, + newExecutionGroups: ReadonlyArray, ): PromiseOrValue>> { if (isPromise(result)) { return result.then((resolved) => { - addIncrementalDataRecords(resolved, newPendingExecutionGroups); + addIncrementalDataRecords(resolved, newExecutionGroups); return resolved; }); } - addIncrementalDataRecords(result, newPendingExecutionGroups); + addIncrementalDataRecords(result, newExecutionGroups); return result; } @@ -1358,7 +1360,7 @@ async function completeAsyncIteratorValue( try { while (true) { if (streamUsage && index >= streamUsage.initialCount) { - const streamItemQueue = buildStreamItemQueue( + const queue = buildStreamItemQueue( index, path, asyncIterator, @@ -1371,7 +1373,7 @@ async function completeAsyncIteratorValue( const streamRecord: StreamRecord = { label: streamUsage.label, path, - streamItemQueue, + queue, }; if (asyncIterator.return !== undefined) { exeContext.earlyReturns ??= new Map(); @@ -1546,7 +1548,7 @@ function completeIterableValue( const syncStreamRecord: StreamRecord = { label: streamUsage.label, path, - streamItemQueue: buildStreamItemQueue( + queue: buildStreamItemQueue( index, path, iterator, @@ -1968,11 +1970,11 @@ function getNewDeferMap( : deferredFragmentRecordFromDeferUsage(parentDeferUsage, newDeferMap); // Instantiate the new record. - const deferredFragmentRecord = new DeferredFragmentRecord( + const deferredFragmentRecord: DeferredFragmentRecord = { path, - newDeferUsage.label, + label: newDeferUsage.label, parent, - ); + }; // Add the new record to the list of new records. newDeferredFragmentRecords.push(deferredFragmentRecord); @@ -2085,7 +2087,7 @@ function executeSubExecutionPlan( } if (newGroupedFieldSets.size > 0) { - const newPendingExecutionGroups = collectExecutionGroups( + const newExecutionGroups = collectExecutionGroups( exeContext, returnType, sourceValue, @@ -2095,10 +2097,7 @@ function executeSubExecutionPlan( newDeferMap, ); - return withNewExecutionGroups( - graphqlWrappedResult, - newPendingExecutionGroups, - ); + return withNewExecutionGroups(graphqlWrappedResult, newExecutionGroups); } return graphqlWrappedResult; } @@ -2476,8 +2475,8 @@ function collectExecutionGroups( parentDeferUsages: DeferUsageSet | undefined, newGroupedFieldSets: Map, deferMap: ReadonlyMap, -): ReadonlyArray { - const newPendingExecutionGroups: Array = []; +): ReadonlyArray { + const newExecutionGroups: Array = []; for (const [deferUsageSet, groupedFieldSet] of newGroupedFieldSets) { const deferredFragmentRecords = getDeferredFragmentRecords( @@ -2485,42 +2484,38 @@ function collectExecutionGroups( deferMap, ); - const pendingExecutionGroup: PendingExecutionGroup = { + const executionGroup = new Task( + () => + executeExecutionGroup( + deferredFragmentRecords, + exeContext, + parentType, + sourceValue, + path, + groupedFieldSet, + { + errors: [], + completed: false, + deferUsageSet, + }, + deferMap, + ), deferredFragmentRecords, - result: - undefined as unknown as BoxedPromiseOrValue, - }; - - const executor = () => - executeExecutionGroup( - pendingExecutionGroup, - exeContext, - parentType, - sourceValue, - path, - groupedFieldSet, - { - errors: [], - completed: false, - deferUsageSet, - }, - deferMap, - ); + ); if (exeContext.validatedExecutionArgs.enableEarlyExecution) { - pendingExecutionGroup.result = new BoxedPromiseOrValue( - shouldDefer(parentDeferUsages, deferUsageSet) - ? Promise.resolve().then(executor) - : executor(), - ); - } else { - pendingExecutionGroup.result = () => new BoxedPromiseOrValue(executor()); + if (shouldDefer(parentDeferUsages, deferUsageSet)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + Promise.resolve().then(() => executionGroup.start()); + } else { + executionGroup.start(); + } } - newPendingExecutionGroups.push(pendingExecutionGroup); + newExecutionGroups.push(executionGroup); } - return newPendingExecutionGroups; + return newExecutionGroups; } function shouldDefer( @@ -2540,7 +2535,7 @@ function shouldDefer( } function executeExecutionGroup( - pendingExecutionGroup: PendingExecutionGroup, + deferredFragmentRecords: ReadonlyArray, exeContext: ExecutionContext, parentType: GraphQLObjectType, sourceValue: unknown, @@ -2548,7 +2543,7 @@ function executeExecutionGroup( groupedFieldSet: GroupedFieldSet, incrementalContext: IncrementalContext, deferMap: ReadonlyMap, -): PromiseOrValue { +): PromiseOrValue { let result; try { result = executeFields( @@ -2562,61 +2557,56 @@ function executeExecutionGroup( ); } catch (error) { incrementalContext.completed = true; - return { - pendingExecutionGroup, - path: pathToArray(path), - errors: [...incrementalContext.errors, error], - }; + throw error; } if (isPromise(result)) { return result.then( (resolved) => { incrementalContext.completed = true; - return buildCompletedExecutionGroup( + return buildExecutionGroupResult( + deferredFragmentRecords, incrementalContext.errors, - pendingExecutionGroup, path, resolved, ); }, (error: unknown) => { incrementalContext.completed = true; - return { - pendingExecutionGroup, - path: pathToArray(path), - errors: [...incrementalContext.errors, error as GraphQLError], - }; + throw error; }, ); } incrementalContext.completed = true; - return buildCompletedExecutionGroup( + return buildExecutionGroupResult( + deferredFragmentRecords, incrementalContext.errors, - pendingExecutionGroup, path, result, ); } -function buildCompletedExecutionGroup( +function buildExecutionGroupResult( + deferredFragmentRecords: ReadonlyArray, errors: ReadonlyArray, - pendingExecutionGroup: PendingExecutionGroup, path: Path | undefined, result: GraphQLWrappedResult>, -): CompletedExecutionGroup { +): ExecutionGroupResult { const { rawResult: data, newDeferredFragmentRecords, incrementalDataRecords, } = result; - return { - pendingExecutionGroup, - path: pathToArray(path), - result: errors.length ? { errors, data } : { data }, + const work = buildWorkFromIncrementalPayload( newDeferredFragmentRecords, incrementalDataRecords, + ); + return { + value: errors.length + ? { deferredFragmentRecords, path: pathToArray(path), errors, data } + : { deferredFragmentRecords, path: pathToArray(path), data }, + work, }; } @@ -2629,6 +2619,27 @@ function getDeferredFragmentRecords( ); } +function buildWorkFromIncrementalPayload( + newDeferredFragmentRecords: ReadonlyArray | undefined, + incrementalDataRecords: ReadonlyArray | undefined, +): IncrementalWork { + const groups = newDeferredFragmentRecords ?? []; + const tasks: Array = []; + const streams: Array = []; + + if (incrementalDataRecords !== undefined) { + for (const incrementalDataRecord of incrementalDataRecords) { + if (isExecutionGroup(incrementalDataRecord)) { + tasks.push(incrementalDataRecord); + } else { + streams.push(incrementalDataRecord); + } + } + } + + return { groups, tasks, streams }; +} + function buildStreamItemQueue( initialIndex: number, streamPath: Path, @@ -2807,10 +2818,11 @@ function buildStreamItemResult( newDeferredFragmentRecords, incrementalDataRecords, } = result; - return { - item, - errors, + const work = buildWorkFromIncrementalPayload( newDeferredFragmentRecords, incrementalDataRecords, - }; + ); + return errors.length > 0 + ? { value: { item, errors }, work } + : { value: { item }, work }; } diff --git a/src/execution/types.ts b/src/execution/types.ts index 2d6399fbb8..a4aac9fa23 100644 --- a/src/execution/types.ts +++ b/src/execution/types.ts @@ -1,4 +1,3 @@ -import type { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js'; import type { ObjMap } from '../jsutils/ObjMap.js'; import type { Path } from '../jsutils/Path.js'; @@ -7,7 +6,8 @@ import type { GraphQLFormattedError, } from '../error/GraphQLError.js'; -import type { Queue } from './Queue.js'; +import type { Group, Stream, Work } from './WorkQueue.js'; +import { Task } from './WorkQueue.js'; /** * The result of GraphQL execution. @@ -91,17 +91,14 @@ export interface FormattedSubsequentIncrementalExecutionResult< extensions?: TExtensions; } -interface ExecutionGroupResult> { - errors?: ReadonlyArray; - data: TData; -} - export interface IncrementalDeferResult< TData = ObjMap, TExtensions = ObjMap, -> extends ExecutionGroupResult { +> { id: string; subPath?: ReadonlyArray; + errors?: ReadonlyArray; + data: TData; extensions?: TExtensions; } @@ -116,17 +113,14 @@ export interface FormattedIncrementalDeferResult< extensions?: TExtensions; } -interface StreamItemsRecordResult> { - errors?: ReadonlyArray; - items: TData; -} - export interface IncrementalStreamResult< TData = ReadonlyArray, TExtensions = ObjMap, -> extends StreamItemsRecordResult { +> { id: string; subPath?: ReadonlyArray; + errors?: ReadonlyArray; + items: TData; extensions?: TExtensions; } @@ -168,113 +162,64 @@ export interface FormattedCompletedResult { errors?: ReadonlyArray; } -export function isPendingExecutionGroup( +export function isExecutionGroup( incrementalDataRecord: IncrementalDataRecord, -): incrementalDataRecord is PendingExecutionGroup { - return 'deferredFragmentRecords' in incrementalDataRecord; -} - -export type CompletedExecutionGroup = - | SuccessfulExecutionGroup - | FailedExecutionGroup; - -export function isCompletedExecutionGroup( - incrementalDataRecordResult: IncrementalDataRecordResult, -): incrementalDataRecordResult is CompletedExecutionGroup { - return 'pendingExecutionGroup' in incrementalDataRecordResult; -} - -export interface SuccessfulExecutionGroup { - pendingExecutionGroup: PendingExecutionGroup; - path: Array; - result: ExecutionGroupResult; - newDeferredFragmentRecords: ReadonlyArray | undefined; - incrementalDataRecords: ReadonlyArray | undefined; - errors?: never; -} - -interface FailedExecutionGroup { - pendingExecutionGroup: PendingExecutionGroup; - path: Array; - errors: ReadonlyArray; - result?: never; -} - -export function isFailedExecutionGroup( - completedExecutionGroup: CompletedExecutionGroup, -): completedExecutionGroup is FailedExecutionGroup { - return completedExecutionGroup.errors !== undefined; -} - -type ThunkIncrementalResult = - | BoxedPromiseOrValue - | (() => BoxedPromiseOrValue); - -export interface PendingExecutionGroup { - deferredFragmentRecords: ReadonlyArray; - result: ThunkIncrementalResult; +): incrementalDataRecord is ExecutionGroup { + return incrementalDataRecord instanceof Task; } -export type DeliveryGroup = DeferredFragmentRecord | StreamRecord; +export type ExecutionGroup = Task< + ExecutionGroupValue, + StreamItemValue, + DeferredFragmentRecord, + StreamRecord +>; /** @internal */ -export class DeferredFragmentRecord { +export interface DeferredFragmentRecord extends Group { path: Path | undefined; label: string | undefined; - id?: string | undefined; parent: DeferredFragmentRecord | undefined; - pendingExecutionGroups: Set; - successfulExecutionGroups: Set; - children: Set; - - constructor( - path: Path | undefined, - label: string | undefined, - parent: DeferredFragmentRecord | undefined, - ) { - this.path = path; - this.label = label; - this.parent = parent; - this.pendingExecutionGroups = new Set(); - this.successfulExecutionGroups = new Set(); - this.children = new Set(); - } } -export function isDeferredFragmentRecord( - deliveryGroup: DeliveryGroup, -): deliveryGroup is DeferredFragmentRecord { - return deliveryGroup instanceof DeferredFragmentRecord; +export interface StreamRecord + extends Stream< + ExecutionGroupValue, + StreamItemValue, + DeferredFragmentRecord, + StreamRecord + > { + path: Path; + label: string | undefined; } -export interface StreamItemResult { - item: unknown; - newDeferredFragmentRecords?: - | ReadonlyArray - | undefined; - incrementalDataRecords?: ReadonlyArray | undefined; +export interface ExecutionGroupValue { + deferredFragmentRecords: ReadonlyArray; + path: ReadonlyArray; errors?: ReadonlyArray; + data: ObjMap; } -export interface StreamRecord { - path: Path; - label: string | undefined; - id?: string | undefined; - streamItemQueue: Queue; +export type IncrementalWork = Work< + ExecutionGroupValue, + StreamItemValue, + DeferredFragmentRecord, + StreamRecord +>; + +export interface ExecutionGroupResult { + value: ExecutionGroupValue; + work?: IncrementalWork | undefined; } -export interface StreamItemsResult { - streamRecord: StreamRecord; +export interface StreamItemValue { errors?: ReadonlyArray; - result?: StreamItemsRecordResult; - newDeferredFragmentRecords?: - | ReadonlyArray - | undefined; - incrementalDataRecords?: ReadonlyArray | undefined; + item: unknown; } -export type IncrementalDataRecord = PendingExecutionGroup | StreamRecord; +export interface StreamItemResult { + value: StreamItemValue; + work?: IncrementalWork | undefined; +} -export type IncrementalDataRecordResult = - | CompletedExecutionGroup - | StreamItemsResult; +export type IncrementalDataRecord = ExecutionGroup | StreamRecord; diff --git a/src/jsutils/BoxedPromiseOrValue.ts b/src/jsutils/BoxedPromiseOrValue.ts deleted file mode 100644 index 7f6f758270..0000000000 --- a/src/jsutils/BoxedPromiseOrValue.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { isPromise } from './isPromise.js'; -import type { PromiseOrValue } from './PromiseOrValue.js'; - -/** - * A BoxedPromiseOrValue is a container for a value or promise where the value - * will be updated when the promise resolves. - * - * A BoxedPromiseOrValue may only be used with promises whose possible - * rejection has already been handled, otherwise this will lead to unhandled - * promise rejections. - * - * @internal - * */ -export class BoxedPromiseOrValue { - value: PromiseOrValue; - - constructor(value: PromiseOrValue) { - this.value = value; - if (isPromise(value)) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - value.then((resolved) => { - this.value = resolved; - }); - } - } -} diff --git a/src/jsutils/__tests__/BoxedPromiseOrValue-test.ts b/src/jsutils/__tests__/BoxedPromiseOrValue-test.ts deleted file mode 100644 index b0fdbd8a83..0000000000 --- a/src/jsutils/__tests__/BoxedPromiseOrValue-test.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { expect } from 'chai'; -import { describe, it } from 'mocha'; - -import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; - -import { BoxedPromiseOrValue } from '../BoxedPromiseOrValue.js'; - -describe('BoxedPromiseOrValue', () => { - it('can box a value', () => { - const boxed = new BoxedPromiseOrValue(42); - - expect(boxed.value).to.equal(42); - }); - - it('can box a promise', () => { - const promise = Promise.resolve(42); - const boxed = new BoxedPromiseOrValue(promise); - - expect(boxed.value).to.equal(promise); - }); - - it('resets the boxed value when the passed promise resolves', async () => { - const promise = Promise.resolve(42); - const boxed = new BoxedPromiseOrValue(promise); - - await resolveOnNextTick(); - - expect(boxed.value).to.equal(42); - }); -});