Skip to content

Commit b12a184

Browse files
committed
fix(codex-adapter): surface structured output and stderr context
Addressed code review feedback to ensure Codex runs surface structured output schemas even for non-stream runs and expose worker stderr when errors bubble up. Updated the worker to capture structured payloads, and taught the adapter to prefer that data, buffer stderr, enrich deserialized errors, and emit cancellation/error stream events when workers exit unexpectedly. Verified via npm run test --workspace examples.
1 parent 04d99c5 commit b12a184

File tree

2 files changed

+170
-29
lines changed

2 files changed

+170
-29
lines changed

packages/codex-adapter/src/index.ts

Lines changed: 116 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const WORKER_PATH = fileURLToPath(new URL('./worker.js', import.meta.url));
2323
const SOFT_KILL_DELAY_MS = 250;
2424
const HARD_KILL_DELAY_MS = 1500;
2525
const DONE = Symbol('stream-done');
26+
const STDERR_BUFFER_LIMIT = 64 * 1024;
2627

2728
export const CODER_NAME: Provider = 'codex';
2829
export function createAdapter(defaults?: StartOpts): HeadlessCoder {
@@ -61,6 +62,7 @@ interface WorkerRunPayload {
6162
result: {
6263
items: any[];
6364
finalResponse: string;
65+
structured?: unknown;
6466
usage?: any;
6567
};
6668
}
@@ -70,6 +72,7 @@ interface SerializedError {
7072
stack?: string;
7173
name?: string;
7274
code?: string;
75+
stderr?: string;
7376
}
7477

7578
type WorkerMessage =
@@ -169,7 +172,8 @@ export class CodexAdapter implements HeadlessCoder {
169172
}
170173

171174
private launchRunWorker(state: CodexThreadState, input: string, opts?: RunOpts) {
172-
const child = fork(WORKER_PATH, { stdio: ['inherit', 'inherit', 'inherit', 'ipc'] });
175+
const child = fork(WORKER_PATH, { stdio: ['inherit', 'inherit', 'pipe', 'ipc'] });
176+
const stderr = collectChildStderr(child);
173177
const abortController = new AbortController();
174178
const stopExternal = linkSignal(opts?.signal, reason => {
175179
if (!abortController.signal.aborted) {
@@ -214,31 +218,33 @@ export class CodexAdapter implements HeadlessCoder {
214218
break;
215219
case 'aborted':
216220
detach();
217-
reject(createAbortError(raw.reason));
221+
reject(attachStderr(createAbortError(raw.reason), stderr.read()));
218222
break;
219223
case 'error':
220224
detach();
221-
reject(deserializeError(raw.error));
225+
reject(attachStderr(deserializeError(raw.error), stderr.read()));
222226
break;
223227
}
224228
});
225229

226230
child.once('exit', (code, signal) => {
227231
if (settled) return;
228232
detach();
233+
const stderrOutput = stderr.read();
229234
if (active.aborted || signal) {
230-
reject(createAbortError(active.abortReason));
235+
const reason = active.abortReason ?? (signal ? `Worker exited via signal ${signal}` : undefined);
236+
reject(attachStderr(createAbortError(reason), stderrOutput));
231237
} else if (code === 0) {
232-
reject(new Error('Codex worker exited unexpectedly.'));
238+
reject(createWorkerExitError('Codex worker exited before returning a result.', stderrOutput));
233239
} else {
234-
reject(new Error(`Codex worker exited with code ${code}`));
240+
reject(createWorkerExitError(`Codex worker exited with code ${code}`, stderrOutput));
235241
}
236242
});
237243

238244
child.once('error', error => {
239245
if (settled) return;
240246
detach();
241-
reject(error);
247+
reject(attachStderr(error as Error, stderr.read()));
242248
});
243249

244250
child.send({ type: 'run', payload: request });
@@ -250,6 +256,7 @@ export class CodexAdapter implements HeadlessCoder {
250256
}
251257
this.clearKillTimers(active);
252258
active.stopExternal();
259+
stderr.cleanup();
253260
try {
254261
child.removeAllListeners();
255262
} catch {
@@ -269,7 +276,8 @@ export class CodexAdapter implements HeadlessCoder {
269276
input: string,
270277
opts?: RunOpts,
271278
): EventIterator {
272-
const child = fork(WORKER_PATH, { stdio: ['inherit', 'inherit', 'inherit', 'ipc'] });
279+
const child = fork(WORKER_PATH, { stdio: ['inherit', 'inherit', 'pipe', 'ipc'] });
280+
const stderr = collectChildStderr(child);
273281
const abortController = new AbortController();
274282
const stopExternal = linkSignal(opts?.signal, reason => {
275283
if (!abortController.signal.aborted) {
@@ -341,59 +349,60 @@ export class CodexAdapter implements HeadlessCoder {
341349
}
342350
case 'cancelled': {
343351
const reason = raw.reason ?? 'Interrupted';
344-
push({
345-
type: 'cancelled',
346-
provider: CODER_NAME,
347-
ts: now(),
348-
originalItem: { reason },
349-
});
352+
push(createCancelledEvent(reason));
350353
push(createInterruptedErrorEvent(reason));
351354
finished = true;
352355
push(DONE);
353356
break;
354357
}
355358
case 'aborted': {
356-
push(createAbortError(raw.reason));
359+
push(attachStderr(createAbortError(raw.reason), stderr.read()));
357360
finished = true;
358361
push(DONE);
359362
break;
360363
}
361364
case 'error': {
362-
push(deserializeError(raw.error));
365+
push(attachStderr(deserializeError(raw.error), stderr.read()));
363366
finished = true;
364367
push(DONE);
365368
break;
366369
}
367370
}
368371
});
369372

370-
child.once('exit', code => {
373+
child.once('exit', (code, signal) => {
371374
if (finished) return;
372375
finished = true;
376+
const stderrOutput = stderr.read();
373377
if (active.aborted) {
374378
const reason = active.abortReason ?? 'Interrupted';
375-
push({
376-
type: 'cancelled',
377-
provider: CODER_NAME,
378-
ts: now(),
379-
originalItem: { reason },
380-
});
379+
push(createCancelledEvent(reason, stderrOutput));
381380
push(createInterruptedErrorEvent(reason));
382381
push(DONE);
383382
return;
384383
}
385-
if (code === 0) {
384+
if (signal) {
385+
const reason = `Codex worker exited via signal ${signal}`;
386+
push(createCancelledEvent(reason, stderrOutput));
387+
push(createWorkerExitErrorEvent(reason, stderrOutput));
386388
push(DONE);
387-
} else {
388-
push(new Error(`Codex worker exited with code ${code}`));
389+
return;
390+
}
391+
if (code === 0) {
392+
const reason = 'Codex worker exited before completing the stream.';
393+
push(createCancelledEvent(reason, stderrOutput));
394+
push(createWorkerExitErrorEvent(reason, stderrOutput));
389395
push(DONE);
396+
return;
390397
}
398+
push(createWorkerExitError(`Codex worker exited with code ${code}`, stderrOutput));
399+
push(DONE);
391400
});
392401

393402
child.once('error', error => {
394403
if (finished) return;
395404
finished = true;
396-
push(error);
405+
push(attachStderr(error as Error, stderr.read()));
397406
push(DONE);
398407
});
399408

@@ -422,6 +431,7 @@ export class CodexAdapter implements HeadlessCoder {
422431
if (state.currentRun === active) {
423432
state.currentRun = null;
424433
}
434+
stderr.cleanup();
425435
if (!child.killed && child.exitCode === null) {
426436
child.kill('SIGTERM');
427437
}
@@ -480,7 +490,8 @@ export class CodexAdapter implements HeadlessCoder {
480490

481491
private mapRunResult(payload: WorkerRunPayload): RunResult {
482492
const finalResponse = payload.result.finalResponse ?? '';
483-
const structured = extractJsonPayload(finalResponse);
493+
const structured =
494+
payload.result.structured === undefined ? extractJsonPayload(finalResponse) : payload.result.structured;
484495
return {
485496
threadId: payload.threadId,
486497
text: finalResponse || undefined,
@@ -676,6 +687,7 @@ function deserializeError(serialized: SerializedError): Error {
676687
if (serialized.stack) error.stack = serialized.stack;
677688
if (serialized.name) error.name = serialized.name;
678689
if (serialized.code) (error as any).code = serialized.code;
690+
if (serialized.stderr) (error as any).stderr = serialized.stderr;
679691
return error;
680692
}
681693

@@ -695,3 +707,79 @@ function reasonToString(reason: unknown): string | undefined {
695707
if (reason instanceof Error && reason.message) return reason.message;
696708
return undefined;
697709
}
710+
711+
function collectChildStderr(child: ChildProcess, limit = STDERR_BUFFER_LIMIT): {
712+
read(): string;
713+
cleanup(): void;
714+
} {
715+
const chunks: Buffer[] = [];
716+
let size = 0;
717+
const onData = (chunk: Buffer | string) => {
718+
if (size >= limit) return;
719+
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
720+
const remaining = limit - size;
721+
if (buffer.length <= remaining) {
722+
chunks.push(buffer);
723+
size += buffer.length;
724+
} else {
725+
chunks.push(buffer.subarray(0, remaining));
726+
size = limit;
727+
}
728+
};
729+
child.stderr?.on('data', onData);
730+
return {
731+
read(): string {
732+
if (!chunks.length) return '';
733+
return Buffer.concat(chunks).toString('utf8').trim();
734+
},
735+
cleanup(): void {
736+
if (!child.stderr) return;
737+
child.stderr.removeListener('data', onData);
738+
},
739+
};
740+
}
741+
742+
function attachStderr<T extends Error>(error: T, stderr: string | undefined): T {
743+
if (stderr && !(error as any).stderr) {
744+
(error as any).stderr = stderr;
745+
}
746+
return error;
747+
}
748+
749+
function createWorkerExitError(message: string, stderr?: string): Error {
750+
const formatted = formatWorkerExitMessage(message, stderr);
751+
const error = new Error(formatted);
752+
if (stderr) (error as any).stderr = stderr;
753+
return error;
754+
}
755+
756+
function createWorkerExitErrorEvent(message: string, stderr?: string): CoderStreamEvent {
757+
const original: { reason: string; stderr?: string } = { reason: message };
758+
if (stderr) original.stderr = stderr;
759+
return {
760+
type: 'error',
761+
provider: CODER_NAME,
762+
code: 'codex.worker_exit',
763+
message,
764+
ts: now(),
765+
originalItem: original,
766+
};
767+
}
768+
769+
function createCancelledEvent(reason: string, stderr?: string): CoderStreamEvent {
770+
const original: { reason: string; stderr?: string } = { reason };
771+
if (stderr) original.stderr = stderr;
772+
return {
773+
type: 'cancelled',
774+
provider: CODER_NAME,
775+
ts: now(),
776+
originalItem: original,
777+
};
778+
}
779+
780+
function formatWorkerExitMessage(base: string, stderr?: string): string {
781+
if (!stderr) return base;
782+
const trimmed = stderr.trim();
783+
if (!trimmed) return base;
784+
return `${base}: ${trimmed}`;
785+
}

packages/codex-adapter/src/worker.ts

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ interface WorkerRunResult {
3838
result: {
3939
items: any[];
4040
finalResponse: string;
41+
structured?: unknown;
4142
usage?: any;
4243
};
4344
}
@@ -47,6 +48,7 @@ interface SerializedError {
4748
stack?: string;
4849
name?: string;
4950
code?: string;
51+
stderr?: string;
5052
}
5153

5254
let abortRequested = false;
@@ -121,6 +123,7 @@ async function consumeEvents(thread: Thread, payload: WorkerRequest, emitEvents:
121123
const items: any[] = [];
122124
let finalResponse = '';
123125
let usage: any = undefined;
126+
let structured: unknown = undefined;
124127
for await (const event of run.events) {
125128
if (abortRequested) {
126129
throw createAbortError(abortReason);
@@ -133,15 +136,24 @@ async function consumeEvents(thread: Thread, payload: WorkerRequest, emitEvents:
133136
if (event.item.type === 'agent_message' && typeof event.item.text === 'string') {
134137
finalResponse = event.item.text;
135138
}
139+
if (structured === undefined) {
140+
structured = extractStructuredFromItem(event.item);
141+
}
136142
} else if (event.type === 'turn.completed') {
137143
usage = event.usage;
144+
if (structured === undefined) {
145+
structured = extractStructuredFromTurn(event);
146+
}
138147
} else if (event.type === 'turn.failed') {
139148
const message = event.error?.message ?? 'Codex turn failed';
140149
const error = new Error(message);
141150
throw error;
142151
}
143152
}
144-
return { items, finalResponse, usage };
153+
if (payload.outputSchema && structured === undefined) {
154+
structured = extractJsonPayload(finalResponse);
155+
}
156+
return { items, finalResponse, structured, usage };
145157
}
146158

147159
async function handleWorkerError(error: unknown): Promise<void> {
@@ -166,11 +178,52 @@ function serializeError(error: unknown): SerializedError {
166178
stack: error.stack,
167179
name: error.name,
168180
code: (error as any).code,
181+
stderr: (error as any).stderr,
169182
};
170183
}
171184
return { message: typeof error === 'string' ? error : 'Unknown error' };
172185
}
173186

187+
function extractStructuredFromItem(item: any): unknown {
188+
if (!item) return undefined;
189+
return firstStructured([
190+
item.output_json,
191+
item.json,
192+
item.output,
193+
item.response_json,
194+
item.structured,
195+
item.data,
196+
]);
197+
}
198+
199+
function extractStructuredFromTurn(event: any): unknown {
200+
if (!event) return undefined;
201+
return firstStructured([event.output_json, event.json, event.result, event.output, event.response_json]);
202+
}
203+
204+
function firstStructured(candidates: unknown[]): unknown {
205+
for (const candidate of candidates) {
206+
if (candidate && typeof candidate === 'object') {
207+
return candidate;
208+
}
209+
}
210+
return undefined;
211+
}
212+
213+
function extractJsonPayload(text: string | undefined): unknown | undefined {
214+
if (!text) return undefined;
215+
const fenced = text.match(/```json\s*([\s\S]+?)```/i);
216+
const candidate = (fenced ? fenced[1] : text).trim();
217+
const start = candidate.indexOf('{');
218+
const end = candidate.lastIndexOf('}');
219+
if (start === -1 || end === -1 || end < start) return undefined;
220+
try {
221+
return JSON.parse(candidate.slice(start, end + 1));
222+
} catch {
223+
return undefined;
224+
}
225+
}
226+
174227
function createAbortError(reason?: string): Error {
175228
const error = new Error(reason ?? 'Operation was interrupted');
176229
error.name = 'AbortError';

0 commit comments

Comments
 (0)