Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 114 additions & 71 deletions packages/core/src/tracing/openai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import { getClient } from '../../currentScopes';
import { captureException } from '../../exports';
import { SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '../../semanticAttributes';
import { SPAN_STATUS_ERROR } from '../../tracing';
import { startSpan, startSpanManual } from '../../tracing/trace';
import { startInactiveSpan, startSpanManual } from '../../tracing/trace';
import type { Span, SpanAttributeValue } from '../../types-hoist/span';
import { isThenable } from '../../utils/is';
import {
GEN_AI_OPERATION_NAME_ATTRIBUTE,
GEN_AI_REQUEST_AVAILABLE_TOOLS_ATTRIBUTE,
Expand Down Expand Up @@ -126,83 +127,117 @@ function addRequestAttributes(span: Span, params: Record<string, unknown>): void
}
}

/**
* Handle common error catching and reporting for streaming requests
*/
function handleStreamingError(error: unknown, span: Span, methodPath: string): never {
captureException(error, {
mechanism: { handled: false, type: 'auto.ai.openai.stream', data: { function: methodPath } },
});

if (span.isRecording()) {
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
span.end();
}
throw error;
}

/**
* Instrument a method with Sentry spans
* Following Sentry AI Agents Manual Instrumentation conventions
* @see https://docs.sentry.io/platforms/javascript/guides/node/tracing/instrumentation/ai-agents-module/#manual-instrumentation
*
* This implementation uses Proxy and startInactiveSpan to preserve the original
* return type (e.g., OpenAI's APIPromise with .withResponse() method).
*/
function instrumentMethod<T extends unknown[], R>(
originalMethod: (...args: T) => Promise<R>,
originalMethod: (...args: T) => R | Promise<R>,
methodPath: InstrumentedMethod,
context: unknown,
options: OpenAiOptions,
): (...args: T) => Promise<R> {
return async function instrumentedMethod(...args: T): Promise<R> {
const requestAttributes = extractRequestAttributes(args, methodPath);
const model = (requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown';
const operationName = getOperationName(methodPath);

const params = args[0] as Record<string, unknown> | undefined;
const isStreamRequested = params && typeof params === 'object' && params.stream === true;

if (isStreamRequested) {
// For streaming responses, use manual span management to properly handle the async generator lifecycle
return startSpanManual(
{
name: `${operationName} ${model} stream-response`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
},
async (span: Span) => {
try {
if (options.recordInputs && params) {
addRequestAttributes(span, params);
}
): (...args: T) => R | Promise<R> {
return new Proxy(originalMethod, {
apply(target, _thisArg, args: T): R | Promise<R> {
const requestAttributes = extractRequestAttributes(args, methodPath);
const model = (requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown';
const operationName = getOperationName(methodPath);

const result = await originalMethod.apply(context, args);
const params = args[0] as Record<string, unknown> | undefined;
const isStreamRequested = params && typeof params === 'object' && params.stream === true;

return instrumentStream(
result as OpenAIStream<ChatCompletionChunk | ResponseStreamingEvent>,
span,
options.recordOutputs ?? false,
) as unknown as R;
} catch (error) {
// For streaming requests that fail before stream creation, we still want to record
// them as streaming requests but end the span gracefully
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
captureException(error, {
mechanism: {
handled: false,
type: 'auto.ai.openai.stream',
data: {
function: methodPath,
},
},
});
span.end();
throw error;
}
},
);
} else {
// Non-streaming responses
return startSpan(
{
name: `${operationName} ${model}`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
},
async (span: Span) => {
try {
if (options.recordInputs && params) {
addRequestAttributes(span, params);
if (isStreamRequested) {
// For streaming responses, use manual span management to properly handle the async generator lifecycle
return startSpanManual(
{
name: `${operationName} ${model} stream-response`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
},
async (span: Span) => {
try {
if (options.recordInputs && params) {
addRequestAttributes(span, params);
}

const result = await target.apply(context, args);

return instrumentStream(
result as OpenAIStream<ChatCompletionChunk | ResponseStreamingEvent>,
span,
options.recordOutputs ?? false,
) as unknown as R;
} catch (error) {
return handleStreamingError(error, span, methodPath);
}
},
);
}

// Non-streaming responses: use startInactiveSpan to preserve original return type
// (e.g., OpenAI's APIPromise with .withResponse())
//
// We use startInactiveSpan instead of startSpan/startSpanManual because those
// internally use handleCallbackErrors which calls .then() on Promises, creating
// a new Promise instance and losing APIPromise's custom methods like .withResponse().
const span = startInactiveSpan({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried using start span here?
I understand that your purpose here is to use handleCallbackErrors and handle promises, you can still achieve this with start span, and as long as we didn't end the span in a different scope/context, it should work fine.

name: `${operationName} ${model}`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
});

if (options.recordInputs && params) {
addRequestAttributes(span, params);
}

This comment was marked as outdated.


// Handle synchronous exceptions from the API call
let result: R | Promise<R>;
try {
result = target.apply(context, args);
} catch (err) {
captureException(err, {
mechanism: {
handled: false,
type: 'auto.ai.openai',
data: {
function: methodPath,
},
},
});
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
span.end();
throw err;
}

const result = await originalMethod.apply(context, args);
addResponseAttributes(span, result, options.recordOutputs);
return result;
} catch (error) {
captureException(error, {
// Attach side-effect handlers without transforming the Promise
// This preserves the original APIPromise type and its methods like .withResponse()
if (isThenable(result)) {
Promise.resolve(result).then(
res => {
addResponseAttributes(span, res as OpenAiResponse, options.recordOutputs);
span.end();
},
err => {
captureException(err, {
mechanism: {
handled: false,
type: 'auto.ai.openai',
Expand All @@ -211,12 +246,20 @@ function instrumentMethod<T extends unknown[], R>(
},
},
});
throw error;
}
},
);
}
};
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
span.end();
},
);
} else {
// Synchronous result (unlikely for OpenAI API but handle it)
addResponseAttributes(span, result as OpenAiResponse, options.recordOutputs);
span.end();
}

// Return the original Promise (APIPromise) with all its methods intact
return result;
},
}) as (...args: T) => R | Promise<R>;
}

/**
Expand Down
Loading