@@ -11,11 +11,15 @@ import (
1111 "time"
1212
1313 "github.com/anthropics/anthropic-sdk-go"
14+ "github.com/anthropics/anthropic-sdk-go/packages/ssestream"
1415 "github.com/anthropics/anthropic-sdk-go/shared/constant"
1516 "github.com/coder/aibridge/mcp"
17+ "github.com/coder/aibridge/tracing"
1618 "github.com/google/uuid"
1719 mcplib "github.com/mark3labs/mcp-go/mcp"
1820 "github.com/tidwall/sjson"
21+ "go.opentelemetry.io/otel/attribute"
22+ "go.opentelemetry.io/otel/trace"
1923
2024 "cdr.dev/slog"
2125)
@@ -26,12 +30,13 @@ type AnthropicMessagesStreamingInterception struct {
2630 AnthropicMessagesInterceptionBase
2731}
2832
29- func NewAnthropicMessagesStreamingInterception (id uuid.UUID , req * MessageNewParamsWrapper , cfg AnthropicConfig , bedrockCfg * AWSBedrockConfig ) * AnthropicMessagesStreamingInterception {
33+ func NewAnthropicMessagesStreamingInterception (id uuid.UUID , req * MessageNewParamsWrapper , cfg AnthropicConfig , bedrockCfg * AWSBedrockConfig , tracer trace. Tracer ) * AnthropicMessagesStreamingInterception {
3034 return & AnthropicMessagesStreamingInterception {AnthropicMessagesInterceptionBase : AnthropicMessagesInterceptionBase {
3135 id : id ,
3236 req : req ,
3337 cfg : cfg ,
3438 bedrockCfg : bedrockCfg ,
39+ tracer : tracer ,
3540 }}
3641}
3742
@@ -43,6 +48,10 @@ func (s *AnthropicMessagesStreamingInterception) Streaming() bool {
4348 return true
4449}
4550
51+ func (s * AnthropicMessagesStreamingInterception ) TraceAttributes (r * http.Request ) []attribute.KeyValue {
52+ return s .AnthropicMessagesInterceptionBase .baseTraceAttributes (r , true )
53+ }
54+
4655// ProcessRequest handles a request to /v1/messages.
4756// This API has a state-machine behind it, which is described in https://docs.claude.com/en/docs/build-with-claude/streaming#event-types.
4857//
@@ -62,13 +71,16 @@ func (s *AnthropicMessagesStreamingInterception) Streaming() bool {
6271// b) if the tool is injected, it will be invoked by the [mcp.ServerProxier] in the remote MCP server, and its
6372// results relayed to the SERVER. The response from the server will be handled synchronously, and this loop
6473// can continue until all injected tool invocations are completed and the response is relayed to the client.
65- func (i * AnthropicMessagesStreamingInterception ) ProcessRequest (w http.ResponseWriter , r * http.Request ) error {
74+ func (i * AnthropicMessagesStreamingInterception ) ProcessRequest (w http.ResponseWriter , r * http.Request ) ( outErr error ) {
6675 if i .req == nil {
6776 return fmt .Errorf ("developer error: req is nil" )
6877 }
6978
79+ ctx , span := i .tracer .Start (r .Context (), "Intercept.ProcessRequest" , trace .WithAttributes (tracing .InterceptionAttributesFromContext (r .Context ())... ))
80+ defer tracing .EndSpanErr (span , & outErr )
81+
7082 // Allow us to interrupt watch via cancel.
71- ctx , cancel := context .WithCancel (r . Context () )
83+ ctx , cancel := context .WithCancel (ctx )
7284 defer cancel ()
7385 r = r .WithContext (ctx ) // Rewire context for SSE cancellation.
7486
@@ -118,12 +130,13 @@ func (i *AnthropicMessagesStreamingInterception) ProcessRequest(w http.ResponseW
118130 isFirst := true
119131newStream:
120132 for {
133+ // TODO add outer loop span (https://github.com/coder/aibridge/issues/67)
121134 if err := streamCtx .Err (); err != nil {
122135 lastErr = fmt .Errorf ("stream exit: %w" , err )
123136 break
124137 }
125138
126- stream := svc . NewStreaming (streamCtx , messages )
139+ stream := i . newStream (streamCtx , svc , messages )
127140
128141 var message anthropic.Message
129142 var lastToolName string
@@ -270,7 +283,7 @@ newStream:
270283 continue
271284 }
272285
273- res , err := tool .Call (streamCtx , input )
286+ res , err := tool .Call (streamCtx , input , i . tracer )
274287
275288 _ = i .recorder .RecordToolUsage (streamCtx , & ToolUsageRecord {
276289 InterceptionID : i .ID ().String (),
@@ -522,3 +535,11 @@ func (s *AnthropicMessagesStreamingInterception) encodeForStream(payload []byte,
522535 buf .WriteString ("\n \n " )
523536 return buf .Bytes ()
524537}
538+
539+ // newStream traces svc.NewStreaming(streamCtx, messages)
540+ func (s * AnthropicMessagesStreamingInterception ) newStream (ctx context.Context , svc anthropic.MessageService , messages anthropic.MessageNewParams ) * ssestream.Stream [anthropic.MessageStreamEventUnion ] {
541+ _ , span := s .tracer .Start (ctx , "Intercept.ProcessRequest.Upstream" , trace .WithAttributes (tracing .InterceptionAttributesFromContext (ctx )... ))
542+ defer span .End ()
543+
544+ return svc .NewStreaming (ctx , messages )
545+ }
0 commit comments