diff --git a/Makefile b/Makefile index a88ba22de..13fa26ca6 100644 --- a/Makefile +++ b/Makefile @@ -32,10 +32,20 @@ QUIET_REDIRECT := >/dev/null 2>&1 endif # Image tags -FRONTEND_IMAGE ?= vteam-frontend:latest -BACKEND_IMAGE ?= vteam-backend:latest -OPERATOR_IMAGE ?= vteam-operator:latest -RUNNER_IMAGE ?= vteam-claude-runner:latest +FRONTEND_IMAGE ?= vteam_frontend:latest +BACKEND_IMAGE ?= vteam_backend:latest +OPERATOR_IMAGE ?= vteam_operator:latest +RUNNER_IMAGE ?= vteam_claude_runner:latest + +# Build metadata (captured at build time) +GIT_COMMIT := $(shell git rev-parse HEAD 2>/dev/null || echo "unknown") +GIT_COMMIT_SHORT := $(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown") +GIT_BRANCH := $(shell git rev-parse --abbrev-ref HEAD 2>/dev/null || echo "unknown") +GIT_REPO := $(shell git remote get-url origin 2>/dev/null || echo "local") +GIT_DIRTY := $(shell git diff --quiet 2>/dev/null || echo "-dirty") +GIT_VERSION := $(shell git describe --tags --always --dirty 2>/dev/null || echo "dev") +BUILD_DATE := $(shell date -u +"%Y-%m-%dT%H:%M:%SZ") +BUILD_USER := $(shell whoami)@$(shell hostname) # Colors for output COLOR_RESET := \033[0m @@ -85,22 +95,54 @@ build-all: build-frontend build-backend build-operator build-runner ## Build all build-frontend: ## Build frontend image @echo "$(COLOR_BLUE)▶$(COLOR_RESET) Building frontend with $(CONTAINER_ENGINE)..." - @cd components/frontend && $(CONTAINER_ENGINE) build $(PLATFORM_FLAG) $(BUILD_FLAGS) -t $(FRONTEND_IMAGE) . + @echo " Git: $(GIT_BRANCH)@$(GIT_COMMIT_SHORT)$(GIT_DIRTY)" + @cd components/frontend && $(CONTAINER_ENGINE) build $(PLATFORM_FLAG) $(BUILD_FLAGS) \ + --build-arg GIT_COMMIT=$(GIT_COMMIT) \ + --build-arg GIT_BRANCH=$(GIT_BRANCH) \ + --build-arg GIT_REPO=$(GIT_REPO) \ + --build-arg GIT_VERSION=$(GIT_VERSION)$(GIT_DIRTY) \ + --build-arg BUILD_DATE=$(BUILD_DATE) \ + --build-arg BUILD_USER=$(BUILD_USER) \ + -t $(FRONTEND_IMAGE) . @echo "$(COLOR_GREEN)✓$(COLOR_RESET) Frontend built: $(FRONTEND_IMAGE)" build-backend: ## Build backend image @echo "$(COLOR_BLUE)▶$(COLOR_RESET) Building backend with $(CONTAINER_ENGINE)..." - @cd components/backend && $(CONTAINER_ENGINE) build $(PLATFORM_FLAG) $(BUILD_FLAGS) -t $(BACKEND_IMAGE) . + @echo " Git: $(GIT_BRANCH)@$(GIT_COMMIT_SHORT)$(GIT_DIRTY)" + @cd components/backend && $(CONTAINER_ENGINE) build $(PLATFORM_FLAG) $(BUILD_FLAGS) \ + --build-arg GIT_COMMIT=$(GIT_COMMIT) \ + --build-arg GIT_BRANCH=$(GIT_BRANCH) \ + --build-arg GIT_REPO=$(GIT_REPO) \ + --build-arg GIT_VERSION=$(GIT_VERSION)$(GIT_DIRTY) \ + --build-arg BUILD_DATE=$(BUILD_DATE) \ + --build-arg BUILD_USER=$(BUILD_USER) \ + -t $(BACKEND_IMAGE) . @echo "$(COLOR_GREEN)✓$(COLOR_RESET) Backend built: $(BACKEND_IMAGE)" build-operator: ## Build operator image @echo "$(COLOR_BLUE)▶$(COLOR_RESET) Building operator with $(CONTAINER_ENGINE)..." - @cd components/operator && $(CONTAINER_ENGINE) build $(PLATFORM_FLAG) $(BUILD_FLAGS) -t $(OPERATOR_IMAGE) . + @echo " Git: $(GIT_BRANCH)@$(GIT_COMMIT_SHORT)$(GIT_DIRTY)" + @cd components/operator && $(CONTAINER_ENGINE) build $(PLATFORM_FLAG) $(BUILD_FLAGS) \ + --build-arg GIT_COMMIT=$(GIT_COMMIT) \ + --build-arg GIT_BRANCH=$(GIT_BRANCH) \ + --build-arg GIT_REPO=$(GIT_REPO) \ + --build-arg GIT_VERSION=$(GIT_VERSION)$(GIT_DIRTY) \ + --build-arg BUILD_DATE=$(BUILD_DATE) \ + --build-arg BUILD_USER=$(BUILD_USER) \ + -t $(OPERATOR_IMAGE) . @echo "$(COLOR_GREEN)✓$(COLOR_RESET) Operator built: $(OPERATOR_IMAGE)" build-runner: ## Build Claude Code runner image @echo "$(COLOR_BLUE)▶$(COLOR_RESET) Building runner with $(CONTAINER_ENGINE)..." - @cd components/runners && $(CONTAINER_ENGINE) build $(PLATFORM_FLAG) $(BUILD_FLAGS) -t $(RUNNER_IMAGE) -f claude-code-runner/Dockerfile . + @echo " Git: $(GIT_BRANCH)@$(GIT_COMMIT_SHORT)$(GIT_DIRTY)" + @cd components/runners && $(CONTAINER_ENGINE) build $(PLATFORM_FLAG) $(BUILD_FLAGS) \ + --build-arg GIT_COMMIT=$(GIT_COMMIT) \ + --build-arg GIT_BRANCH=$(GIT_BRANCH) \ + --build-arg GIT_REPO=$(GIT_REPO) \ + --build-arg GIT_VERSION=$(GIT_VERSION)$(GIT_DIRTY) \ + --build-arg BUILD_DATE=$(BUILD_DATE) \ + --build-arg BUILD_USER=$(BUILD_USER) \ + -t $(RUNNER_IMAGE) -f claude-code-runner/Dockerfile . @echo "$(COLOR_GREEN)✓$(COLOR_RESET) Runner built: $(RUNNER_IMAGE)" ##@ Git Hooks @@ -236,7 +278,15 @@ local-rebuild: ## Rebuild and reload all components local-reload-backend: ## Rebuild and reload backend only @echo "$(COLOR_BLUE)▶$(COLOR_RESET) Rebuilding backend..." - @cd components/backend && $(CONTAINER_ENGINE) build -t $(BACKEND_IMAGE) . >/dev/null 2>&1 + @echo " Git: $(GIT_BRANCH)@$(GIT_COMMIT_SHORT)$(GIT_DIRTY)" + @cd components/backend && $(CONTAINER_ENGINE) build -t $(BACKEND_IMAGE) \ + --build-arg GIT_COMMIT=$(GIT_COMMIT) \ + --build-arg GIT_BRANCH=$(GIT_BRANCH) \ + --build-arg GIT_REPO=$(GIT_REPO) \ + --build-arg GIT_VERSION=$(GIT_VERSION)$(GIT_DIRTY) \ + --build-arg BUILD_DATE=$(BUILD_DATE) \ + --build-arg BUILD_USER=$(BUILD_USER) \ + . >/dev/null 2>&1 @$(CONTAINER_ENGINE) tag $(BACKEND_IMAGE) localhost/$(BACKEND_IMAGE) 2>/dev/null || true @$(CONTAINER_ENGINE) save -o /tmp/backend-reload.tar localhost/$(BACKEND_IMAGE) @minikube image load /tmp/backend-reload.tar >/dev/null 2>&1 @@ -259,7 +309,15 @@ local-reload-backend: ## Rebuild and reload backend only local-reload-frontend: ## Rebuild and reload frontend only @echo "$(COLOR_BLUE)▶$(COLOR_RESET) Rebuilding frontend..." - @cd components/frontend && $(CONTAINER_ENGINE) build -t $(FRONTEND_IMAGE) . >/dev/null 2>&1 + @echo " Git: $(GIT_BRANCH)@$(GIT_COMMIT_SHORT)$(GIT_DIRTY)" + @cd components/frontend && $(CONTAINER_ENGINE) build -t $(FRONTEND_IMAGE) \ + --build-arg GIT_COMMIT=$(GIT_COMMIT) \ + --build-arg GIT_BRANCH=$(GIT_BRANCH) \ + --build-arg GIT_REPO=$(GIT_REPO) \ + --build-arg GIT_VERSION=$(GIT_VERSION)$(GIT_DIRTY) \ + --build-arg BUILD_DATE=$(BUILD_DATE) \ + --build-arg BUILD_USER=$(BUILD_USER) \ + . >/dev/null 2>&1 @$(CONTAINER_ENGINE) tag $(FRONTEND_IMAGE) localhost/$(FRONTEND_IMAGE) 2>/dev/null || true @$(CONTAINER_ENGINE) save -o /tmp/frontend-reload.tar localhost/$(FRONTEND_IMAGE) @minikube image load /tmp/frontend-reload.tar >/dev/null 2>&1 @@ -283,7 +341,15 @@ local-reload-frontend: ## Rebuild and reload frontend only local-reload-operator: ## Rebuild and reload operator only @echo "$(COLOR_BLUE)▶$(COLOR_RESET) Rebuilding operator..." - @cd components/operator && $(CONTAINER_ENGINE) build -t $(OPERATOR_IMAGE) . >/dev/null 2>&1 + @echo " Git: $(GIT_BRANCH)@$(GIT_COMMIT_SHORT)$(GIT_DIRTY)" + @cd components/operator && $(CONTAINER_ENGINE) build -t $(OPERATOR_IMAGE) \ + --build-arg GIT_COMMIT=$(GIT_COMMIT) \ + --build-arg GIT_BRANCH=$(GIT_BRANCH) \ + --build-arg GIT_REPO=$(GIT_REPO) \ + --build-arg GIT_VERSION=$(GIT_VERSION)$(GIT_DIRTY) \ + --build-arg BUILD_DATE=$(BUILD_DATE) \ + --build-arg BUILD_USER=$(BUILD_USER) \ + . >/dev/null 2>&1 @$(CONTAINER_ENGINE) tag $(OPERATOR_IMAGE) localhost/$(OPERATOR_IMAGE) 2>/dev/null || true @$(CONTAINER_ENGINE) save -o /tmp/operator-reload.tar localhost/$(OPERATOR_IMAGE) @minikube image load /tmp/operator-reload.tar >/dev/null 2>&1 diff --git a/components/backend/Dockerfile b/components/backend/Dockerfile index e186bdfb2..27966fb60 100644 --- a/components/backend/Dockerfile +++ b/components/backend/Dockerfile @@ -1,6 +1,14 @@ # Build stage FROM registry.access.redhat.com/ubi9/go-toolset:1.24 AS builder +# Build arguments for metadata +ARG GIT_COMMIT=unknown +ARG GIT_BRANCH=unknown +ARG GIT_REPO=unknown +ARG GIT_VERSION=unknown +ARG BUILD_DATE=unknown +ARG BUILD_USER=unknown + WORKDIR /app USER 0 @@ -14,21 +22,52 @@ RUN go mod download # Copy the source code COPY . . -# Build the application (with flags to avoid segfault) -RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o main . +# Build the application with embedded version info +# The -X flag injects build-time variables into the binary +# This ensures git metadata is baked into the binary itself, not just ENV vars +RUN CGO_ENABLED=0 GOOS=linux go build \ + -ldflags="-s -w \ + -X main.GitCommit=${GIT_COMMIT} \ + -X main.GitBranch=${GIT_BRANCH} \ + -X main.GitVersion=${GIT_VERSION} \ + -X main.BuildDate=${BUILD_DATE}" \ + -o main . # Final stage FROM registry.access.redhat.com/ubi9/ubi-minimal:latest +# Build arguments (need to redeclare for final stage) +ARG GIT_COMMIT=unknown +ARG GIT_BRANCH=unknown +ARG GIT_REPO=unknown +ARG GIT_VERSION=unknown +ARG BUILD_DATE=unknown +ARG BUILD_USER=unknown + +# Add labels to force cache invalidation and provide metadata +LABEL git.commit="${GIT_COMMIT}" +LABEL git.branch="${GIT_BRANCH}" +LABEL git.version="${GIT_VERSION}" +LABEL build.date="${BUILD_DATE}" +LABEL build.user="${BUILD_USER}" + RUN microdnf install -y git && microdnf clean all WORKDIR /app -# Copy the binary from builder stage +# Copy the binary from builder stage (binary has metadata embedded via ldflags) COPY --from=builder /app/main . # Default agents directory ENV AGENTS_DIR=/app/agents +# Build metadata as environment variables (fallback, primary source is embedded in binary) +ENV GIT_COMMIT=${GIT_COMMIT} +ENV GIT_BRANCH=${GIT_BRANCH} +ENV GIT_REPO=${GIT_REPO} +ENV GIT_VERSION=${GIT_VERSION} +ENV BUILD_DATE=${BUILD_DATE} +ENV BUILD_USER=${BUILD_USER} + # Set executable permissions and make accessible to any user RUN chmod +x ./main && chmod 775 /app diff --git a/components/backend/go.mod b/components/backend/go.mod index d2abbabdb..2cf62958c 100644 --- a/components/backend/go.mod +++ b/components/backend/go.mod @@ -10,7 +10,6 @@ require ( github.com/gin-gonic/gin v1.10.1 github.com/golang-jwt/jwt/v5 v5.3.0 github.com/google/uuid v1.6.0 - github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 github.com/joho/godotenv v1.5.1 github.com/onsi/ginkgo/v2 v2.27.3 github.com/onsi/gomega v1.38.3 diff --git a/components/backend/go.sum b/components/backend/go.sum index 3c35fe618..d92b7491c 100644 --- a/components/backend/go.sum +++ b/components/backend/go.sum @@ -116,8 +116,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= -github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= -github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go index 111d97bb3..0e359f930 100644 --- a/components/backend/handlers/sessions.go +++ b/components/backend/handlers/sessions.go @@ -41,7 +41,7 @@ var ( DynamicClient dynamic.Interface GetGitHubToken func(context.Context, kubernetes.Interface, dynamic.Interface, string, string) (string, error) DeriveRepoFolderFromURL func(string) string - SendMessageToSession func(string, string, map[string]interface{}) + // LEGACY: SendMessageToSession removed - AG-UI server uses HTTP/SSE instead of WebSocket ) const runnerTokenRefreshedAtAnnotation = "ambient-code.io/token-refreshed-at" @@ -2065,6 +2065,10 @@ func StartSession(c *gin.Context) { if spec, ok := updated.Object["spec"].(map[string]interface{}); ok { session.Spec = parseSpec(spec) + + // NOTE: INITIAL_PROMPT auto-execution handled by runner on startup + // Runner POSTs to /agui/run when ready, events flow through backend + // This works for both UI and headless/API usage } if status, ok := updated.Object["status"].(map[string]interface{}); ok { @@ -3898,3 +3902,7 @@ func GitListBranchesSession(c *gin.Context) { } c.Data(resp.StatusCode, resp.Header.Get("Content-Type"), bodyBytes) } + +// NOTE: autoTriggerInitialPrompt removed - runner handles INITIAL_PROMPT auto-execution +// Runner POSTs to backend's /agui/run when ready, events flow through middleware +// See: components/runners/claude-code-runner/main.py auto_execute_initial_prompt() diff --git a/components/backend/handlers/test_helpers_test.go b/components/backend/handlers/test_helpers_test.go index 02f70c6c0..da4c616ce 100644 --- a/components/backend/handlers/test_helpers_test.go +++ b/components/backend/handlers/test_helpers_test.go @@ -51,9 +51,6 @@ func SetupHandlerDependencies(k8sUtils *test_utils.K8sTestUtils) { } return "repo" } - SendMessageToSession = func(sessionID, userID string, message map[string]interface{}) { - // no-op in unit tests - } logger.Log("Handler dependencies set up with fake clients") } diff --git a/components/backend/main.go b/components/backend/main.go index d9936ae5c..ac7b4465d 100644 --- a/components/backend/main.go +++ b/components/backend/main.go @@ -15,11 +15,43 @@ import ( "github.com/joho/godotenv" ) +// Build-time metadata (set via -ldflags -X during build) +// These are embedded directly in the binary, so they're always accurate +var ( + GitCommit = "unknown" + GitBranch = "unknown" + GitVersion = "unknown" + BuildDate = "unknown" +) + +func logBuildInfo() { + log.Println("==============================================") + log.Println("Backend API - Build Information") + log.Println("==============================================") + log.Printf("Version: %s", GitVersion) + log.Printf("Commit: %s", GitCommit) + log.Printf("Branch: %s", GitBranch) + log.Printf("Repository: %s", getEnvOrDefault("GIT_REPO", "unknown")) + log.Printf("Built: %s", BuildDate) + log.Printf("Built by: %s", getEnvOrDefault("BUILD_USER", "unknown")) + log.Println("==============================================") +} + +func getEnvOrDefault(key, defaultValue string) string { + if value := os.Getenv(key); value != "" { + return value + } + return defaultValue +} + func main() { // Load environment from .env in development if present _ = godotenv.Overload(".env.local") _ = godotenv.Overload(".env") + // Log build information + logBuildInfo() + // Content service mode - minimal initialization, no K8s access needed if os.Getenv("CONTENT_SERVICE_MODE") == "true" { log.Println("Starting in CONTENT_SERVICE_MODE (no K8s client initialization)") @@ -94,7 +126,7 @@ func main() { handlers.DynamicClient = server.DynamicClient handlers.GetGitHubToken = handlers.WrapGitHubTokenForRepo(git.GetGitHubToken) handlers.DeriveRepoFolderFromURL = git.DeriveRepoFolderFromURL - handlers.SendMessageToSession = websocket.SendMessageToSession + // LEGACY: SendMessageToSession removed - AG-UI server uses HTTP/SSE instead of WebSocket // Initialize repo handlers (default implementation already set in client_selection.go) // GetK8sClientsForRequestRepoFunc uses getK8sClientsForRequestRepoDefault by default diff --git a/components/backend/routes.go b/components/backend/routes.go index 134026c55..a0231ac39 100644 --- a/components/backend/routes.go +++ b/components/backend/routes.go @@ -83,10 +83,14 @@ func registerRoutes(r *gin.Engine) { // OAuth integration - requires user auth like all other session endpoints projectGroup.GET("/agentic-sessions/:sessionName/oauth/:provider/url", handlers.GetOAuthURL) - projectGroup.GET("/sessions/:sessionId/ws", websocket.HandleSessionWebSocket) - projectGroup.GET("/sessions/:sessionId/messages", websocket.GetSessionMessagesWS) - // Removed: /messages/claude-format - Using SDK's built-in resume with persisted ~/.claude state - projectGroup.POST("/sessions/:sessionId/messages", websocket.PostSessionMessageWS) + // AG-UI Protocol endpoints (HttpAgent-compatible) + // See: https://docs.ag-ui.com/quickstart/introduction + // Runner is a FastAPI server - backend proxies requests and streams SSE responses + projectGroup.POST("/agentic-sessions/:sessionName/agui/run", websocket.HandleAGUIRunProxy) + projectGroup.POST("/agentic-sessions/:sessionName/agui/interrupt", websocket.HandleAGUIInterrupt) + projectGroup.GET("/agentic-sessions/:sessionName/agui/events", websocket.HandleAGUIEvents) + projectGroup.GET("/agentic-sessions/:sessionName/agui/history", websocket.HandleAGUIHistory) + projectGroup.GET("/agentic-sessions/:sessionName/agui/runs", websocket.HandleAGUIRuns) projectGroup.GET("/permissions", handlers.ListProjectPermissions) projectGroup.POST("/permissions", handlers.AddProjectPermission) diff --git a/components/backend/types/agui.go b/components/backend/types/agui.go new file mode 100644 index 000000000..4cb52d801 --- /dev/null +++ b/components/backend/types/agui.go @@ -0,0 +1,300 @@ +// Package types defines AG-UI protocol types for event streaming. +// Reference: https://docs.ag-ui.com/concepts/events +package types + +import "time" + +// AG-UI Event Types as defined in the protocol specification +// See: https://docs.ag-ui.com/concepts/events +const ( + // Lifecycle events + EventTypeRunStarted = "RUN_STARTED" + EventTypeRunFinished = "RUN_FINISHED" + EventTypeRunError = "RUN_ERROR" + + // Step events + EventTypeStepStarted = "STEP_STARTED" + EventTypeStepFinished = "STEP_FINISHED" + + // Text message events (streaming) + EventTypeTextMessageStart = "TEXT_MESSAGE_START" + EventTypeTextMessageContent = "TEXT_MESSAGE_CONTENT" + EventTypeTextMessageEnd = "TEXT_MESSAGE_END" + + // Tool call events (streaming) + EventTypeToolCallStart = "TOOL_CALL_START" + EventTypeToolCallArgs = "TOOL_CALL_ARGS" + EventTypeToolCallEnd = "TOOL_CALL_END" + + // State management events + EventTypeStateSnapshot = "STATE_SNAPSHOT" + EventTypStateDelta = "STATE_DELTA" + + // Message snapshot for restore/reconnect + EventTypeMessagesSnapshot = "MESSAGES_SNAPSHOT" + + // Activity events (frontend-only durable UI) + EventTypeActivitySnapshot = "ACTIVITY_SNAPSHOT" + EventTypeActivityDelta = "ACTIVITY_DELTA" + + // Raw event for pass-through + EventTypeRaw = "RAW" +) + +// AG-UI Message Roles +// See: https://docs.ag-ui.com/concepts/messages +const ( + RoleUser = "user" + RoleAssistant = "assistant" + RoleSystem = "system" + RoleTool = "tool" + RoleDeveloper = "developer" + RoleActivity = "activity" +) + +// BaseEvent is the common structure for all AG-UI events +// See: https://docs.ag-ui.com/concepts/events#baseeventproperties +type BaseEvent struct { + Type string `json:"type"` + ThreadID string `json:"threadId"` + RunID string `json:"runId"` + Timestamp string `json:"timestamp"` + // Optional fields + MessageID string `json:"messageId,omitempty"` + ParentRunID string `json:"parentRunId,omitempty"` +} + +// RunAgentInput is the input format for starting an AG-UI run +// See: https://docs.ag-ui.com/quickstart/introduction +type RunAgentInput struct { + ThreadID string `json:"threadId,omitempty"` + RunID string `json:"runId,omitempty"` + ParentRunID string `json:"parentRunId,omitempty"` + Messages []Message `json:"messages,omitempty"` + State map[string]interface{} `json:"state,omitempty"` + Tools []ToolDefinition `json:"tools,omitempty"` + Context map[string]interface{} `json:"context,omitempty"` +} + +// RunAgentOutput is the response after starting a run +type RunAgentOutput struct { + ThreadID string `json:"threadId"` + RunID string `json:"runId"` + ParentRunID string `json:"parentRunId,omitempty"` + StreamURL string `json:"streamUrl,omitempty"` +} + +// Message represents an AG-UI message in the conversation +// See: https://docs.ag-ui.com/concepts/messages +type Message struct { + ID string `json:"id"` + Role string `json:"role"` + Content string `json:"content,omitempty"` + ToolCalls []ToolCall `json:"toolCalls,omitempty"` + ToolCallID string `json:"toolCallId,omitempty"` + Name string `json:"name,omitempty"` + Timestamp string `json:"timestamp,omitempty"` + Metadata interface{} `json:"metadata,omitempty"` +} + +// ToolCall represents a tool call made by the assistant +type ToolCall struct { + ID string `json:"id"` + Name string `json:"name"` + Args string `json:"args"` + Type string `json:"type,omitempty"` // "function" + ParentToolUseID string `json:"parentToolUseId,omitempty"` // For hierarchical nesting + Result string `json:"result,omitempty"` + Status string `json:"status,omitempty"` // "pending", "running", "completed", "error" + Error string `json:"error,omitempty"` + Duration int64 `json:"duration,omitempty"` // milliseconds +} + +// ToolDefinition describes an available tool +type ToolDefinition struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Parameters map[string]interface{} `json:"parameters,omitempty"` +} + +// RunStartedEvent is emitted when a run begins +type RunStartedEvent struct { + BaseEvent + Input *RunAgentInput `json:"input,omitempty"` +} + +// RunFinishedEvent is emitted when a run completes successfully +type RunFinishedEvent struct { + BaseEvent + Output interface{} `json:"output,omitempty"` +} + +// RunErrorEvent is emitted when a run fails +type RunErrorEvent struct { + BaseEvent + Error string `json:"error"` + Code string `json:"code,omitempty"` + Details string `json:"details,omitempty"` +} + +// StepStartedEvent marks the beginning of a processing step +type StepStartedEvent struct { + BaseEvent + StepID string `json:"stepId"` + StepName string `json:"stepName"` +} + +// StepFinishedEvent marks the completion of a processing step +type StepFinishedEvent struct { + BaseEvent + StepID string `json:"stepId"` + StepName string `json:"stepName"` + Duration int64 `json:"duration,omitempty"` // milliseconds +} + +// TextMessageStartEvent begins a streaming text message +type TextMessageStartEvent struct { + BaseEvent + Role string `json:"role"` +} + +// TextMessageContentEvent contains a chunk of text content +type TextMessageContentEvent struct { + BaseEvent + Delta string `json:"delta"` +} + +// TextMessageEndEvent marks the end of a streaming text message +type TextMessageEndEvent struct { + BaseEvent +} + +// ToolCallStartEvent begins a streaming tool call +type ToolCallStartEvent struct { + BaseEvent + ToolCallID string `json:"toolCallId"` + ToolCallName string `json:"toolCallName"` + ParentMessageID string `json:"parentMessageId,omitempty"` + ParentToolUseID string `json:"parentToolUseId,omitempty"` +} + +// ToolCallArgsEvent contains a chunk of tool call arguments +type ToolCallArgsEvent struct { + BaseEvent + ToolCallID string `json:"toolCallId"` + Delta string `json:"delta"` +} + +// ToolCallEndEvent marks the end of a streaming tool call +type ToolCallEndEvent struct { + BaseEvent + ToolCallID string `json:"toolCallId"` + Result string `json:"result,omitempty"` + Error string `json:"error,omitempty"` + Duration int64 `json:"duration,omitempty"` // milliseconds +} + +// StateSnapshotEvent provides complete state for hydration +type StateSnapshotEvent struct { + BaseEvent + State map[string]interface{} `json:"state"` +} + +// StateDeltaEvent provides incremental state updates +type StateDeltaEvent struct { + BaseEvent + Delta []StatePatch `json:"delta"` +} + +// StatePatch represents a JSON Patch operation for state updates +type StatePatch struct { + Op string `json:"op"` // "add", "remove", "replace" + Path string `json:"path"` // JSON Pointer + Value interface{} `json:"value,omitempty"` +} + +// MessagesSnapshotEvent provides complete message history for hydration +type MessagesSnapshotEvent struct { + BaseEvent + Messages []Message `json:"messages"` +} + +// ActivitySnapshotEvent provides complete activity UI state +type ActivitySnapshotEvent struct { + BaseEvent + Activities []Activity `json:"activities"` +} + +// ActivityDeltaEvent provides incremental activity updates +type ActivityDeltaEvent struct { + BaseEvent + Delta []ActivityPatch `json:"delta"` +} + +// Activity represents a durable frontend UI element +type Activity struct { + ID string `json:"id"` + Type string `json:"type"` + Title string `json:"title,omitempty"` + Status string `json:"status,omitempty"` // "pending", "running", "completed", "error" + Progress float64 `json:"progress,omitempty"` + Data map[string]interface{} `json:"data,omitempty"` +} + +// ActivityPatch represents an update to an activity +type ActivityPatch struct { + Op string `json:"op"` // "add", "update", "remove" + Activity Activity `json:"activity"` +} + +// RawEvent allows pass-through of arbitrary data +type RawEvent struct { + BaseEvent + Data interface{} `json:"data"` +} + +// NewBaseEvent creates a new BaseEvent with current timestamp +func NewBaseEvent(eventType, threadID, runID string) BaseEvent { + return BaseEvent{ + Type: eventType, + ThreadID: threadID, + RunID: runID, + Timestamp: time.Now().UTC().Format(time.RFC3339Nano), + } +} + +// WithMessageID adds a message ID to the event +func (e BaseEvent) WithMessageID(messageID string) BaseEvent { + e.MessageID = messageID + return e +} + +// WithParentRunID adds a parent run ID to the event +func (e BaseEvent) WithParentRunID(parentRunID string) BaseEvent { + e.ParentRunID = parentRunID + return e +} + +// AGUIEventLog represents the persisted event log structure +type AGUIEventLog struct { + ThreadID string `json:"threadId"` + RunID string `json:"runId"` + ParentRunID string `json:"parentRunId,omitempty"` + Events []BaseEvent `json:"events"` + CreatedAt string `json:"createdAt"` + UpdatedAt string `json:"updatedAt"` +} + +// AGUIRunMetadata contains metadata about a run for indexing +type AGUIRunMetadata struct { + ThreadID string `json:"threadId"` + RunID string `json:"runId"` + ParentRunID string `json:"parentRunId,omitempty"` + SessionName string `json:"sessionName"` + ProjectName string `json:"projectName"` + StartedAt string `json:"startedAt"` + FinishedAt string `json:"finishedAt,omitempty"` + Status string `json:"status"` // "running", "completed", "error" + EventCount int `json:"eventCount"` + RestartCount int `json:"restartCount,omitempty"` +} diff --git a/components/backend/websocket/agui.go b/components/backend/websocket/agui.go new file mode 100644 index 000000000..ce505e169 --- /dev/null +++ b/components/backend/websocket/agui.go @@ -0,0 +1,1048 @@ +// Package websocket provides AG-UI protocol endpoints for event streaming. +// See: https://docs.ag-ui.com/quickstart/introduction +package websocket + +import ( + "ambient-code-backend/handlers" + "ambient-code-backend/types" + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "sync" + "time" + + "github.com/gin-gonic/gin" + authv1 "k8s.io/api/authorization/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// AG-UI run state tracking and storage +var ( + StateBaseDir string // Base directory for session state persistence (moved from hub.go) + + aguiRuns = make(map[string]*AGUIRunState) // runID -> state + aguiRunsMu sync.RWMutex + + // Thread-level subscribers: sessionID -> channels for ALL runs in thread + threadSubscribers = make(map[string]map[chan interface{}]bool) + threadSubscribersMu sync.RWMutex +) + +// AGUIRunState tracks the state of an AG-UI run +type AGUIRunState struct { + ThreadID string + RunID string + ParentRunID string + SessionID string // maps to our sessionName + ProjectName string + Status string // "running", "completed", "error" + StartedAt time.Time + subscribers map[chan *types.BaseEvent]bool + fullEventSub map[chan interface{}]bool // For full events with all fields + subscriberMu sync.RWMutex +} + +// Subscribe adds a subscriber to this run's events +func (r *AGUIRunState) Subscribe() chan *types.BaseEvent { + ch := make(chan *types.BaseEvent, 100) + r.subscriberMu.Lock() + r.subscribers[ch] = true + r.subscriberMu.Unlock() + return ch +} + +// Unsubscribe removes a subscriber from this run's events +func (r *AGUIRunState) Unsubscribe(ch chan *types.BaseEvent) { + r.subscriberMu.Lock() + delete(r.subscribers, ch) + close(ch) + r.subscriberMu.Unlock() +} + +// Broadcast sends an event to all subscribers +func (r *AGUIRunState) Broadcast(event *types.BaseEvent) { + r.subscriberMu.RLock() + defer r.subscriberMu.RUnlock() + for ch := range r.subscribers { + select { + case ch <- event: + default: + // Channel full, skip + } + } +} + +// BroadcastFull broadcasts full event with all fields (not just BaseEvent) +func (r *AGUIRunState) BroadcastFull(event interface{}) { + r.subscriberMu.RLock() + defer r.subscriberMu.RUnlock() + + // Send to full event subscribers + for ch := range r.fullEventSub { + select { + case ch <- event: + default: + // Channel full, skip + } + } + + // Also send BaseEvent to legacy subscribers + if baseEvent, ok := extractBaseEvent(event); ok { + for ch := range r.subscribers { + select { + case ch <- baseEvent: + default: + // Channel full, skip + } + } + } +} + +// RouteAGUIEvent routes an AG-UI event directly from WebSocket to subscribers +// This is the simplified flow - no SessionMessage wrapping, no translation needed +func RouteAGUIEvent(sessionID string, event map[string]interface{}) { + eventType, ok := event["type"].(string) + if !ok { + log.Printf("AGUI: Event missing type field, skipping") + return + } + + // Find active run for this session + var activeRunState *AGUIRunState + aguiRunsMu.RLock() + for _, state := range aguiRuns { + if state.SessionID == sessionID && state.Status == "running" { + activeRunState = state + break + } + } + aguiRunsMu.RUnlock() + + // If no active run found, check if event has a runId we should create + if activeRunState == nil { + // Don't create lazy runs for terminal events - they should only apply to existing runs + if isTerminalEventType(eventType) { + go persistAGUIEventMap(sessionID, "", event) + return + } + + eventRunID, ok := event["runId"].(string) + if ok && eventRunID != "" { + // Create run lazily from event's runId + threadID := sessionID + activeRunState = &AGUIRunState{ + ThreadID: threadID, + RunID: eventRunID, + SessionID: sessionID, + Status: "running", + StartedAt: time.Now(), + subscribers: make(map[chan *types.BaseEvent]bool), + fullEventSub: make(map[chan interface{}]bool), + } + aguiRunsMu.Lock() + aguiRuns[eventRunID] = activeRunState + aguiRunsMu.Unlock() + } else { + go persistAGUIEventMap(sessionID, "", event) + return + } + } + + threadID := activeRunState.ThreadID + runID := activeRunState.RunID + + // CRITICAL: Use runId from event if present (event is source of truth) + // Don't use activeRunState.RunID which might be stale + if eventRunID, ok := event["runId"].(string); ok && eventRunID != "" { + runID = eventRunID + } + if eventThreadID, ok := event["threadId"].(string); ok && eventThreadID != "" { + threadID = eventThreadID + } + + // Fill in missing IDs only if not present + if event["threadId"] == nil || event["threadId"] == "" { + event["threadId"] = threadID + } + if event["runId"] == nil || event["runId"] == "" { + event["runId"] = runID + } + + // Broadcast to run-specific SSE subscribers + activeRunState.BroadcastFull(event) + + // Also broadcast to thread-level subscribers (clients watching entire session) + threadSubscribersMu.RLock() + if subscribers, exists := threadSubscribers[sessionID]; exists { + for ch := range subscribers { + select { + case ch <- event: + default: + } + } + } + threadSubscribersMu.RUnlock() + + // Persist the event (use runID from event, not activeRunState) + go persistAGUIEventMap(sessionID, runID, event) + + // Check for terminal events - mark run as complete + if isTerminalEventType(eventType) { + activeRunState.Status = getTerminalStatusFromType(eventType) + + // Schedule cleanup of run state (no need to compact async - we compact on SSE connect) + go scheduleRunCleanup(runID, 5*time.Minute) + } +} + +// loadCompactedMessages loads pre-compacted messages from completed runs +// NOTE: Removed loadCompactedMessages and compactAndPersistRun functions. +// We now use "compact-on-read" strategy in streamThreadEvents. +// This eliminates race conditions, dual-file complexity, and async compaction issues. + +// persistAGUIEventMap persists a map[string]interface{} event to disk +func persistAGUIEventMap(sessionID, runID string, event map[string]interface{}) { + path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID) + _ = ensureDir(fmt.Sprintf("%s/sessions/%s", StateBaseDir, sessionID)) + + data, err := json.Marshal(event) + if err != nil { + log.Printf("AGUI: failed to marshal event for persistence: %v", err) + return + } + + f, err := openFileAppend(path) + if err != nil { + log.Printf("AGUI: failed to open event log: %v", err) + return + } + defer f.Close() + + if _, err := f.Write(append(data, '\n')); err != nil { + log.Printf("AGUI: failed to write event: %v", err) + return + } + +} + +// isTerminalEventType checks if an event type indicates run completion +func isTerminalEventType(eventType string) bool { + switch eventType { + case types.EventTypeRunFinished, types.EventTypeRunError: + return true + } + return false +} + +// getTerminalStatusFromType returns the run status for a terminal event type +func getTerminalStatusFromType(eventType string) string { + switch eventType { + case types.EventTypeRunFinished: + return "completed" + case types.EventTypeRunError: + return "error" + default: + return "completed" + } +} + +// extractBaseEvent extracts the BaseEvent from any AG-UI event type +func extractBaseEvent(event interface{}) (*types.BaseEvent, bool) { + switch e := event.(type) { + case *types.BaseEvent: + return e, true + case *types.TextMessageStartEvent: + return &e.BaseEvent, true + case *types.TextMessageContentEvent: + return &e.BaseEvent, true + case *types.TextMessageEndEvent: + return &e.BaseEvent, true + case *types.ToolCallStartEvent: + return &e.BaseEvent, true + case *types.ToolCallArgsEvent: + return &e.BaseEvent, true + case *types.ToolCallEndEvent: + return &e.BaseEvent, true + case *types.StepStartedEvent: + return &e.BaseEvent, true + case *types.StepFinishedEvent: + return &e.BaseEvent, true + case *types.RunStartedEvent: + return &e.BaseEvent, true + case *types.RunFinishedEvent: + return &e.BaseEvent, true + case *types.RunErrorEvent: + return &e.BaseEvent, true + case *types.StateSnapshotEvent: + return &e.BaseEvent, true + case *types.StateDeltaEvent: + return &e.BaseEvent, true + case *types.MessagesSnapshotEvent: + return &e.BaseEvent, true + case *types.ActivitySnapshotEvent: + return &e.BaseEvent, true + case *types.ActivityDeltaEvent: + return &e.BaseEvent, true + case *types.RawEvent: + return &e.BaseEvent, true + default: + return nil, false + } +} + +// LEGACY: Old HandleAGUIRun function removed - replaced by HandleAGUIRunProxy +// The new proxy forwards requests to the runner's FastAPI server instead of using WebSocket + +// streamThreadEvents streams events from ALL runs in a thread (session) +// This is the correct AG-UI pattern: client connects to thread, not individual runs +func streamThreadEvents(c *gin.Context, projectName, sessionName string) { + threadID := sessionName + eventCh := make(chan interface{}, 100) + ctx := c.Request.Context() + + // Subscribe to all current and future runs for this session + threadSubscribersMu.Lock() + if threadSubscribers[sessionName] == nil { + threadSubscribers[sessionName] = make(map[chan interface{}]bool) + } + threadSubscribers[sessionName][eventCh] = true + threadSubscribersMu.Unlock() + + defer func() { + threadSubscribersMu.Lock() + delete(threadSubscribers[sessionName], eventCh) + if len(threadSubscribers[sessionName]) == 0 { + delete(threadSubscribers, sessionName) + } + threadSubscribersMu.Unlock() + close(eventCh) + }() + + // OPTION 1: Compact-on-Read Strategy (COMPLETED RUNS ONLY) + // Load events from agui-events.jsonl and compact only COMPLETED runs + // Active/in-progress runs will be streamed raw + + // Declare outside so it's accessible later for replaying active runs + activeRunIDs := make(map[string]bool) + + events, err := loadEventsForRun(sessionName, "") + if err == nil && len(events) > 0 { + + // CRITICAL FIX: Determine which runs are TRULY active by checking event log + // A run is only active if NO terminal event exists in the log + runHasTerminalEvent := make(map[string]bool) + for _, event := range events { + eventRunID, ok := event["runId"].(string) + if !ok { + continue + } + eventType, ok := event["type"].(string) + if !ok { + continue + } + + if eventRunID != "" && isTerminalEventType(eventType) { + runHasTerminalEvent[eventRunID] = true + } + } + + // Check in-memory state and override with event log truth + // Also fix stale in-memory state + aguiRunsMu.Lock() + for _, state := range aguiRuns { + if state.SessionID == sessionName { + runID := state.RunID + // Only consider active if NO terminal event in log + if !runHasTerminalEvent[runID] { + activeRunIDs[runID] = true + } else { + // Fix stale memory state + if state.Status == "running" { + state.Status = "completed" + } + } + } + } + aguiRunsMu.Unlock() + + // Filter to only events from COMPLETED runs (have terminal event) + completedEvents := make([]map[string]interface{}, 0) + skippedCount := 0 + for _, event := range events { + eventRunID, ok := event["runId"].(string) + if !ok { + continue + } + + // Skip events without runId + if eventRunID == "" { + skippedCount++ + continue + } + + // Skip events from active runs (no terminal event yet) + if activeRunIDs[eventRunID] { + skippedCount++ + continue + } + + // Include events from completed runs + completedEvents = append(completedEvents, event) + } + + if len(completedEvents) > 0 { + // Compact only completed run events + messages := CompactEvents(completedEvents) + + // Send single MESSAGES_SNAPSHOT with compacted messages from COMPLETED runs + if len(messages) > 0 { + snapshot := &types.MessagesSnapshotEvent{ + BaseEvent: types.NewBaseEvent(types.EventTypeMessagesSnapshot, threadID, "thread-snapshot"), + Messages: messages, + } + writeSSEEvent(c.Writer, snapshot) + c.Writer.(http.Flusher).Flush() + } + } + } else if err != nil { + log.Printf("AGUI: Failed to load events: %v", err) + } + + // Replay ALL active runs (not just most recent) + // CRITICAL: This ensures all non-compacted events are sent to client + aguiRunsMu.RLock() + activeRunStates := make([]*AGUIRunState, 0) + for _, state := range aguiRuns { + if state.SessionID == sessionName && activeRunIDs[state.RunID] { + activeRunStates = append(activeRunStates, state) + } + } + aguiRunsMu.RUnlock() + + if len(activeRunStates) > 0 { + + // Load all events once + allEvents, err := loadEventsForRun(sessionName, "") + if err == nil { + for _, activeRunState := range activeRunStates { + // Send RUN_STARTED for this active run + runStarted := &types.RunStartedEvent{ + BaseEvent: types.NewBaseEvent(types.EventTypeRunStarted, threadID, activeRunState.RunID), + } + if activeRunState.ParentRunID != "" { + runStarted.ParentRunID = activeRunState.ParentRunID + } + writeSSEEvent(c.Writer, runStarted) + + // Send state snapshot + sendBasicStateSnapshot(c, activeRunState, projectName, sessionName) + + // Collect events for this run + runEvents := make([]map[string]interface{}, 0) + for _, event := range allEvents { + eventRunID, ok := event["runId"].(string) + if ok && eventRunID == activeRunState.RunID { + runEvents = append(runEvents, event) + } + } + + // Replay raw events + if len(runEvents) > 0 { + for _, event := range runEvents { + writeSSEEvent(c.Writer, event) + } + } + } + c.Writer.(http.Flusher).Flush() + } + } + + // Stream events from all future runs with keepalive + keepaliveTicker := time.NewTicker(15 * time.Second) + defer keepaliveTicker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-keepaliveTicker.C: + // Send SSE comment to prevent gateway timeout + _, err := c.Writer.Write([]byte(": keepalive\n\n")) + if err != nil { + log.Printf("AGUI: Keepalive write failed, closing stream: %v", err) + return + } + c.Writer.(http.Flusher).Flush() + case event, ok := <-eventCh: + if !ok { + return + } + writeSSEEvent(c.Writer, event) + c.Writer.(http.Flusher).Flush() + } + } +} + +// HandleAGUIEvents handles GET /api/projects/:projectName/agentic-sessions/:sessionName/agui/events +// This is the AG-UI SSE stream endpoint +// See: https://docs.ag-ui.com/quickstart/middleware +func HandleAGUIEvents(c *gin.Context) { + projectName := c.Param("projectName") + sessionName := c.Param("sessionName") + runID := c.Query("runId") + + // SECURITY: Authenticate user and get user-scoped K8s client + reqK8s, _ := handlers.GetK8sClientsForRequest(c) + if reqK8s == nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid or missing token"}) + c.Abort() + return + } + + // SECURITY: Verify user has permission to read this session + ctx := context.Background() + ssar := &authv1.SelfSubjectAccessReview{ + Spec: authv1.SelfSubjectAccessReviewSpec{ + ResourceAttributes: &authv1.ResourceAttributes{ + Group: "vteam.ambient-code", + Resource: "agenticsessions", + Verb: "get", + Namespace: projectName, + Name: sessionName, + }, + }, + } + res, err := reqK8s.AuthorizationV1().SelfSubjectAccessReviews().Create(ctx, ssar, metav1.CreateOptions{}) + if err != nil || !res.Status.Allowed { + log.Printf("AGUI Events: User not authorized to read session %s/%s", projectName, sessionName) + c.JSON(http.StatusForbidden, gin.H{"error": "Unauthorized"}) + c.Abort() + return + } + + // Set SSE headers + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + c.Header("X-Accel-Buffering", "no") + + // If no runId specified, stream the entire THREAD (all runs for this session) + // This is the correct AG-UI pattern: client connects once to thread stream + if runID == "" { + streamThreadEvents(c, projectName, sessionName) + return + } + + // Legacy: specific run streaming (kept for compatibility) + + var runState *AGUIRunState + aguiRunsMu.RLock() + runState = aguiRuns[runID] + aguiRunsMu.RUnlock() + + if runState == nil { + // Create an implicit run for this connection + threadID := sessionName + runState = &AGUIRunState{ + ThreadID: threadID, + RunID: runID, + SessionID: sessionName, + ProjectName: projectName, + Status: "running", + StartedAt: time.Now(), + subscribers: make(map[chan *types.BaseEvent]bool), + fullEventSub: make(map[chan interface{}]bool), + } + aguiRunsMu.Lock() + aguiRuns[runID] = runState + aguiRunsMu.Unlock() + } + + // Subscribe to full events (includes Delta, ToolCallID, etc.) + fullEventCh := make(chan interface{}, 100) + runState.subscriberMu.Lock() + runState.fullEventSub[fullEventCh] = true + runState.subscriberMu.Unlock() + defer func() { + runState.subscriberMu.Lock() + delete(runState.fullEventSub, fullEventCh) + runState.subscriberMu.Unlock() + close(fullEventCh) + }() + + // Send initial sync events (with panic recovery) + func() { + defer func() { + if r := recover(); r != nil { + log.Printf("AGUI: panic in sendInitialSyncEvents: %v", r) + } + }() + sendInitialSyncEvents(c, runState, projectName, sessionName) + }() + + // Create context for client disconnection + streamCtx := c.Request.Context() + + // Stream events + for { + select { + case <-streamCtx.Done(): + return + case event, ok := <-fullEventCh: + if !ok { + return + } + writeSSEEvent(c.Writer, event) + c.Writer.(http.Flusher).Flush() + } + } +} + +// sendInitialSyncEvents sends snapshot events on connection/reconnection +// This implements the reconnect/restore strategy per AG-UI serialization guidance +func sendInitialSyncEvents(c *gin.Context, runState *AGUIRunState, projectName, sessionName string) { + threadID := runState.ThreadID + runID := runState.RunID + + // 1. Send RUN_STARTED + runStarted := &types.RunStartedEvent{ + BaseEvent: types.NewBaseEvent(types.EventTypeRunStarted, threadID, runID), + } + if runState.ParentRunID != "" { + runStarted.ParentRunID = runState.ParentRunID + } + writeSSEEvent(c.Writer, runStarted) + + // 2. Send basic state snapshot (always succeeds) + sendBasicStateSnapshot(c, runState, projectName, sessionName) + + // 3. Compact stored events and send MESSAGES_SNAPSHOT + // Per AG-UI spec: compact at read-time, not write-time + events, err := loadEventsForRun(sessionName, runID) + if err != nil { + log.Printf("AGUI: Failed to load events for %s: %v", sessionName, err) + } + + if len(events) > 0 { + messages := CompactEvents(events) + + if len(messages) > 0 { + snapshot := &types.MessagesSnapshotEvent{ + BaseEvent: types.NewBaseEvent(types.EventTypeMessagesSnapshot, threadID, runID), + Messages: messages, + } + writeSSEEvent(c.Writer, snapshot) + } + } +} + +// sendBasicStateSnapshot sends a basic state snapshot with session metadata +func sendBasicStateSnapshot(c *gin.Context, runState *AGUIRunState, projectName, sessionName string) { + threadID := runState.ThreadID + runID := runState.RunID + + stateSnapshot := &types.StateSnapshotEvent{ + BaseEvent: types.NewBaseEvent(types.EventTypeStateSnapshot, threadID, runID), + State: map[string]interface{}{ + "sessionName": sessionName, + "projectName": projectName, + "status": runState.Status, + }, + } + + // Enrich with session data if available + sessionData, err := getSessionState(projectName, sessionName) + if err == nil && sessionData != nil { + for k, v := range sessionData { + stateSnapshot.State[k] = v + } + } + writeSSEEvent(c.Writer, stateSnapshot) +} + +// writeSSEEvent writes an event in SSE format +func writeSSEEvent(w http.ResponseWriter, event interface{}) { + data, err := json.Marshal(event) + if err != nil { + log.Printf("AGUI: failed to marshal event: %v", err) + return + } + + fmt.Fprintf(w, "data: %s\n\n", data) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } +} + +// scheduleRunCleanup removes a run from the active runs map after a delay +func scheduleRunCleanup(runID string, delay time.Duration) { + time.Sleep(delay) + aguiRunsMu.Lock() + if run, ok := aguiRuns[runID]; ok { + // Only delete if run is no longer active + if run.Status != "running" { + delete(aguiRuns, runID) + } + } + aguiRunsMu.Unlock() +} + +// cleanupOldRuns periodically cleans up old inactive runs +func init() { + go func() { + ticker := time.NewTicker(10 * time.Minute) + for range ticker.C { + cleanupInactiveRuns() + } + }() +} + +func cleanupInactiveRuns() { + cutoff := time.Now().Add(-30 * time.Minute) + aguiRunsMu.Lock() + defer aguiRunsMu.Unlock() + for runID, run := range aguiRuns { + if run.Status != "running" && run.StartedAt.Before(cutoff) { + delete(aguiRuns, runID) + } + } +} + +// Legacy translation functions removed - AG-UI events now route directly via RouteAGUIEvent + +// Helper functions for state and message retrieval + +func getSessionState(projectName, sessionName string) (map[string]interface{}, error) { + // Get session from K8s and extract relevant state + if handlers.DynamicClient == nil { + // Return basic state if K8s client not available + return map[string]interface{}{ + "phase": "Unknown", + "interactive": true, + }, nil + } + + gvr := handlers.GetAgenticSessionV1Alpha1Resource() + item, err := handlers.DynamicClient.Resource(gvr).Namespace(projectName).Get( + context.Background(), sessionName, metav1.GetOptions{}, + ) + if err != nil { + log.Printf("AGUI: failed to get session state: %v", err) + return map[string]interface{}{ + "phase": "Unknown", + "interactive": true, + }, nil + } + + state := make(map[string]interface{}) + + // Extract spec fields + if spec, ok := item.Object["spec"].(map[string]interface{}); ok { + if interactive, ok := spec["interactive"].(bool); ok { + state["interactive"] = interactive + } + if displayName, ok := spec["displayName"].(string); ok { + state["displayName"] = displayName + } + if repos, ok := spec["repos"].([]interface{}); ok { + state["repos"] = repos + } + if workflow, ok := spec["activeWorkflow"].(map[string]interface{}); ok { + state["activeWorkflow"] = workflow + } + } + + // Extract status fields + if status, ok := item.Object["status"].(map[string]interface{}); ok { + if phase, ok := status["phase"].(string); ok { + state["phase"] = phase + } + if sdkSessionID, ok := status["sdkSessionId"].(string); ok { + state["sdkSessionId"] = sdkSessionID + } + if restartCount, ok := status["sdkRestartCount"].(int64); ok { + state["sdkRestartCount"] = restartCount + } else if restartCount, ok := status["sdkRestartCount"].(float64); ok { + state["sdkRestartCount"] = int(restartCount) + } + if reconciledRepos, ok := status["reconciledRepos"].([]interface{}); ok { + state["reconciledRepos"] = reconciledRepos + } + } + + return state, nil +} + +// AG-UI event persistence +// Implements append-only event log per AG-UI serialization guidance: +// https://docs.ag-ui.com/concepts/serialization#serialization + +// persistRunMetadata saves run metadata for indexing +func persistRunMetadata(sessionID string, meta types.AGUIRunMetadata) { + path := fmt.Sprintf("%s/sessions/%s/agui-runs.jsonl", StateBaseDir, sessionID) + + _ = ensureDir(fmt.Sprintf("%s/sessions/%s", StateBaseDir, sessionID)) + + data, err := json.Marshal(meta) + if err != nil { + log.Printf("AGUI: failed to marshal run metadata: %v", err) + return + } + + f, err := openFileAppend(path) + if err != nil { + log.Printf("AGUI: failed to open runs index: %v", err) + return + } + defer f.Close() + + if _, err := f.Write(append(data, '\n')); err != nil { + log.Printf("AGUI: failed to write run metadata: %v", err) + } +} + +// loadRunsFromDisk loads persisted run metadata from disk +func loadRunsFromDisk(sessionID string) []types.AGUIRunMetadata { + path := fmt.Sprintf("%s/sessions/%s/agui-runs.jsonl", StateBaseDir, sessionID) + runs := make([]types.AGUIRunMetadata, 0) + + data, err := os.ReadFile(path) + if err != nil { + if !os.IsNotExist(err) { + log.Printf("AGUI: failed to read runs index: %v", err) + } + return runs + } + + lines := splitLines(data) + for _, line := range lines { + if len(line) == 0 { + continue + } + var meta types.AGUIRunMetadata + if err := json.Unmarshal(line, &meta); err == nil { + runs = append(runs, meta) + } + } + + return runs +} + +// loadEventsForRun loads all events for a session (thread) from disk +// Per AG-UI spec: all runs in a thread share the same event log +// Includes automatic migration from legacy message format +func loadEventsForRun(sessionID, runID string) ([]map[string]interface{}, error) { + path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID) + + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + // Check if legacy messages.json exists and migrate + if err := MigrateLegacySessionToAGUI(sessionID); err != nil { + log.Printf("LegacyMigration: Failed to migrate session %s: %v", sessionID, err) + } else { + // Try reading again after migration + data, err = os.ReadFile(path) + if err != nil { + return []map[string]interface{}{}, nil + } + } + if len(data) == 0 { + return []map[string]interface{}{}, nil + } + } else { + return nil, err + } + } + + events := make([]map[string]interface{}, 0) + lines := splitLines(data) + for _, line := range lines { + if len(line) == 0 { + continue + } + var event map[string]interface{} + if err := json.Unmarshal(line, &event); err == nil { + // Filter by runID if specified + if runID != "" { + eventRunID, ok := event["runId"].(string) + if !ok || eventRunID != runID { + continue + } + } + events = append(events, event) + } + } + + return events, nil +} + +// splitLines splits bytes by newline +func splitLines(data []byte) [][]byte { + var lines [][]byte + start := 0 + for i, b := range data { + if b == '\n' { + line := data[start:i] + if len(line) > 0 { + lines = append(lines, line) + } + start = i + 1 + } + } + if start < len(data) { + lines = append(lines, data[start:]) + } + return lines +} + +// HandleAGUIHistory handles GET /api/projects/:projectName/agentic-sessions/:sessionName/agui/history +// Returns compacted message history for a session +func HandleAGUIHistory(c *gin.Context) { + projectName := c.Param("projectName") + sessionName := c.Param("sessionName") + + // SECURITY: Authenticate user and get user-scoped K8s client + reqK8s, _ := handlers.GetK8sClientsForRequest(c) + if reqK8s == nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid or missing token"}) + c.Abort() + return + } + + // SECURITY: Verify user has permission to read this session + ctx := context.Background() + ssar := &authv1.SelfSubjectAccessReview{ + Spec: authv1.SelfSubjectAccessReviewSpec{ + ResourceAttributes: &authv1.ResourceAttributes{ + Group: "vteam.ambient-code", + Resource: "agenticsessions", + Verb: "get", + Namespace: projectName, + Name: sessionName, + }, + }, + } + res, err := reqK8s.AuthorizationV1().SelfSubjectAccessReviews().Create(ctx, ssar, metav1.CreateOptions{}) + if err != nil || !res.Status.Allowed { + log.Printf("AGUI History: User not authorized to read session %s/%s", projectName, sessionName) + c.JSON(http.StatusForbidden, gin.H{"error": "Unauthorized"}) + c.Abort() + return + } + runID := c.Query("runId") + + // Compact events to messages + var messages []types.Message + if runID != "" { + events, err := loadEventsForRun(sessionName, runID) + if err == nil { + messages = CompactEvents(events) + } + } + + // Get runs for this session + runs := getRunsForSession(sessionName) + + c.JSON(http.StatusOK, gin.H{ + "threadId": sessionName, + "runId": runID, + "messages": messages, + "runs": runs, + }) +} + +// HandleAGUIRuns handles GET /api/projects/:projectName/agentic-sessions/:sessionName/agui/runs +// Returns list of runs for a session (thread) +func HandleAGUIRuns(c *gin.Context) { + projectName := c.Param("projectName") + sessionName := c.Param("sessionName") + + // SECURITY: Authenticate user and get user-scoped K8s client + reqK8s, _ := handlers.GetK8sClientsForRequest(c) + if reqK8s == nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid or missing token"}) + c.Abort() + return + } + + // SECURITY: Verify user has permission to read this session + ctx := context.Background() + ssar := &authv1.SelfSubjectAccessReview{ + Spec: authv1.SelfSubjectAccessReviewSpec{ + ResourceAttributes: &authv1.ResourceAttributes{ + Group: "vteam.ambient-code", + Resource: "agenticsessions", + Verb: "get", + Namespace: projectName, + Name: sessionName, + }, + }, + } + res, err := reqK8s.AuthorizationV1().SelfSubjectAccessReviews().Create(ctx, ssar, metav1.CreateOptions{}) + if err != nil || !res.Status.Allowed { + log.Printf("AGUI Runs: User not authorized to read session %s/%s", projectName, sessionName) + c.JSON(http.StatusForbidden, gin.H{"error": "Unauthorized"}) + c.Abort() + return + } + + runs := getRunsForSession(sessionName) + + c.JSON(http.StatusOK, gin.H{ + "threadId": sessionName, + "runs": runs, + }) +} + +func getRunsForSession(sessionID string) []types.AGUIRunMetadata { + // First load from disk (historical runs) + runs := loadRunsFromDisk(sessionID) + + // Create a set of run IDs from disk + diskRunIDs := make(map[string]bool) + for _, r := range runs { + diskRunIDs[r.RunID] = true + } + + // Add any active runs not yet persisted + aguiRunsMu.RLock() + for _, run := range aguiRuns { + if run.SessionID == sessionID && !diskRunIDs[run.RunID] { + meta := types.AGUIRunMetadata{ + ThreadID: run.ThreadID, + RunID: run.RunID, + ParentRunID: run.ParentRunID, + SessionName: run.SessionID, + ProjectName: run.ProjectName, + StartedAt: run.StartedAt.Format(time.RFC3339), + Status: run.Status, + } + runs = append(runs, meta) + } + } + aguiRunsMu.RUnlock() + + return runs +} + +// Helper file operations + +func ensureDir(path string) error { + return os.MkdirAll(path, 0755) +} + +func openFileAppend(path string) (*os.File, error) { + return os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) +} + +// Integration with existing hub - modify hub.go run() to also broadcast to AG-UI subscribers +// This is done by calling BroadcastToSessionSubscribers in the hub's broadcast case + +func init() { + // Hook into the hub to also broadcast to AG-UI subscribers + // We'll need to modify hub.go to call BroadcastToSessionSubscribers +} diff --git a/components/backend/websocket/agui_proxy.go b/components/backend/websocket/agui_proxy.go new file mode 100644 index 000000000..6a60ba79b --- /dev/null +++ b/components/backend/websocket/agui_proxy.go @@ -0,0 +1,473 @@ +// Package websocket provides AG-UI protocol endpoints including HTTP proxy to runner. +package websocket + +import ( + "ambient-code-backend/handlers" + "ambient-code-backend/types" + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "strings" + "time" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" + authv1 "k8s.io/api/authorization/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// HandleAGUIRunProxy proxies AG-UI run requests to runner's FastAPI server +// This replaces the WebSocket-based communication with HTTP/SSE +func HandleAGUIRunProxy(c *gin.Context) { + projectName := c.Param("projectName") + sessionName := c.Param("sessionName") + + // SECURITY: Authenticate user and get user-scoped K8s client + reqK8s, _ := handlers.GetK8sClientsForRequest(c) + if reqK8s == nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid or missing token"}) + c.Abort() + return + } + + // SECURITY: Verify user has permission to update this session + ctx := context.Background() + ssar := &authv1.SelfSubjectAccessReview{ + Spec: authv1.SelfSubjectAccessReviewSpec{ + ResourceAttributes: &authv1.ResourceAttributes{ + Group: "vteam.ambient-code", + Resource: "agenticsessions", + Verb: "update", + Namespace: projectName, + Name: sessionName, + }, + }, + } + res, err := reqK8s.AuthorizationV1().SelfSubjectAccessReviews().Create(ctx, ssar, metav1.CreateOptions{}) + if err != nil || !res.Status.Allowed { + log.Printf("AGUI Proxy: User not authorized to update session %s/%s", projectName, sessionName) + c.JSON(http.StatusForbidden, gin.H{"error": "Unauthorized"}) + c.Abort() + return + } + + log.Printf("AGUI Proxy: Forwarding run request for %s/%s", projectName, sessionName) + + var input types.RunAgentInput + if err := c.ShouldBindJSON(&input); err != nil { + log.Printf("AGUI Proxy: Failed to parse input: %v", err) + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("invalid input: %v", err)}) + return + } + log.Printf("AGUI Proxy: Input has %d messages", len(input.Messages)) + + // Generate or use provided IDs + threadID := input.ThreadID + if threadID == "" { + threadID = sessionName + } + runID := input.RunID + if runID == "" { + runID = uuid.New().String() + } + input.ThreadID = threadID + input.RunID = runID + + log.Printf("AGUI Proxy: Creating run %s for session %s (threadId=%s)", runID, sessionName, threadID) + + // Create run state for tracking + runState := &AGUIRunState{ + ThreadID: threadID, + RunID: runID, + ParentRunID: input.ParentRunID, + SessionID: sessionName, + ProjectName: projectName, + Status: "running", + StartedAt: time.Now(), + subscribers: make(map[chan *types.BaseEvent]bool), + fullEventSub: make(map[chan interface{}]bool), + } + + aguiRunsMu.Lock() + aguiRuns[runID] = runState + aguiRunsMu.Unlock() + + // Persist run metadata + go persistRunMetadata(sessionName, types.AGUIRunMetadata{ + ThreadID: threadID, + RunID: runID, + ParentRunID: input.ParentRunID, + SessionName: sessionName, + ProjectName: projectName, + StartedAt: runState.StartedAt.Format(time.RFC3339), + Status: "running", + }) + + // NOTE: User messages are now echoed by the runner (AG-UI server pattern) + // The runner emits TEXT_MESSAGE_START/CONTENT/END events which are persisted + // when they stream through this proxy. No need to echo them here. + + // Trigger async display name generation on first user message + // This generates a descriptive name using Claude Haiku based on the message + go triggerDisplayNameGenerationIfNeeded(projectName, sessionName, input.Messages) + + // Get runner endpoint + runnerURL, err := getRunnerEndpoint(projectName, sessionName) + if err != nil { + log.Printf("AGUI Proxy: Failed to get runner endpoint: %v", err) + c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Runner not available"}) + return + } + + log.Printf("AGUI Proxy: Runner endpoint: %s", runnerURL) + + // Serialize input for proxy request + bodyBytes, err := json.Marshal(input) + if err != nil { + log.Printf("AGUI Proxy: Failed to serialize input: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to serialize input"}) + return + } + + log.Printf("AGUI Proxy: Run %s starting, will consume runner stream in background", runID) + + // Start background goroutine that owns the entire HTTP lifecycle + // This ensures the connection stays open after we return to client + // Note: We use context.Background() (not request context) because this goroutine + // must continue running after the HTTP request completes. The timeout and terminal + // event handling prevent unbounded goroutine accumulation. + go func() { + // Create request with long timeout (detached from client request lifecycle) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Hour) + defer cancel() + + proxyReq, err := http.NewRequestWithContext(ctx, "POST", runnerURL, bytes.NewReader(bodyBytes)) + if err != nil { + log.Printf("AGUI Proxy: Failed to create request in background: %v", err) + updateRunStatus(runID, "error") + return + } + + // Forward headers + proxyReq.Header.Set("Content-Type", "application/json") + proxyReq.Header.Set("Accept", "text/event-stream") + + // Execute request + client := &http.Client{ + Timeout: 0, // No timeout, context handles it + } + resp, err := client.Do(proxyReq) + if err != nil { + log.Printf("AGUI Proxy: Background request failed: %v", err) + updateRunStatus(runID, "error") + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + log.Printf("AGUI Proxy: Runner returned status %d: %s", resp.StatusCode, string(body)) + updateRunStatus(runID, "error") + return + } + + log.Printf("AGUI Proxy: Background stream started for run %s", runID) + + reader := bufio.NewReader(resp.Body) + + for { + // Check if context was cancelled (timeout or cleanup) + select { + case <-ctx.Done(): + log.Printf("AGUI Proxy: Context cancelled for run %s", runID) + return + default: + } + + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + log.Printf("AGUI Proxy: Background stream ended for run %s", runID) + break + } + log.Printf("AGUI Proxy: Background stream read error: %v", err) + break + } + + // Parse and persist SSE events + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "data: ") { + jsonData := strings.TrimPrefix(line, "data: ") + handleStreamedEvent(sessionName, runID, threadID, jsonData, runState) + } + } + + // Mark run as completed + aguiRunsMu.RLock() + currentStatus := "completed" + if state, exists := aguiRuns[runID]; exists && state.Status == "error" { + currentStatus = "error" + } + aguiRunsMu.RUnlock() + + updateRunStatus(runID, currentStatus) + log.Printf("AGUI Proxy: Background stream completed for run %s (status=%s)", runID, currentStatus) + }() + + // Return run metadata immediately (don't wait for stream) + // Events will be broadcast to GET /agui/events subscribers + streamURL := fmt.Sprintf("/api/projects/%s/agentic-sessions/%s/agui/events", projectName, sessionName) + + c.JSON(http.StatusOK, gin.H{ + "threadId": threadID, + "runId": runID, + "streamUrl": streamURL, + "status": "started", + }) +} + +// handleStreamedEvent parses and persists a streamed AG-UI event +func handleStreamedEvent(sessionID, runID, threadID, jsonData string, runState *AGUIRunState) { + var event map[string]interface{} + if err := json.Unmarshal([]byte(jsonData), &event); err != nil { + log.Printf("AGUI Proxy: Failed to parse event JSON: %v", err) + return + } + + eventType, _ := event["type"].(string) + + // Ensure threadId and runId are set + if _, ok := event["threadId"]; !ok { + event["threadId"] = threadID + } + if _, ok := event["runId"]; !ok { + event["runId"] = runID + } + + // Check for terminal events + switch eventType { + case types.EventTypeRunFinished: + updateRunStatus(runID, "completed") + case types.EventTypeRunError: + updateRunStatus(runID, "error") + } + + // Persist event + persistAGUIEventMap(sessionID, runID, event) + + // Broadcast to subscribers (for SSE /events endpoint) + if runState != nil { + runState.BroadcastFull(event) + } + + // Also broadcast to thread subscribers + broadcastToThread(sessionID, event) +} + +// updateRunStatus updates the status of a run +func updateRunStatus(runID, status string) { + aguiRunsMu.Lock() + if state, exists := aguiRuns[runID]; exists { + state.Status = status + // Update persisted metadata + go persistRunMetadata(state.SessionID, types.AGUIRunMetadata{ + ThreadID: state.ThreadID, + RunID: state.RunID, + ParentRunID: state.ParentRunID, + SessionName: state.SessionID, + ProjectName: state.ProjectName, + StartedAt: state.StartedAt.Format(time.RFC3339), + Status: status, + }) + } + aguiRunsMu.Unlock() +} + +// HandleAGUIInterrupt sends interrupt signal to runner to stop current execution +// POST /api/projects/:projectName/agentic-sessions/:sessionName/agui/interrupt +func HandleAGUIInterrupt(c *gin.Context) { + projectName := c.Param("projectName") + sessionName := c.Param("sessionName") + + // SECURITY: Authenticate user and get user-scoped K8s client + reqK8s, _ := handlers.GetK8sClientsForRequest(c) + if reqK8s == nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid or missing token"}) + c.Abort() + return + } + + // SECURITY: Verify user has permission to update this session + ctx := context.Background() + ssar := &authv1.SelfSubjectAccessReview{ + Spec: authv1.SelfSubjectAccessReviewSpec{ + ResourceAttributes: &authv1.ResourceAttributes{ + Group: "vteam.ambient-code", + Resource: "agenticsessions", + Verb: "update", + Namespace: projectName, + Name: sessionName, + }, + }, + } + res, err := reqK8s.AuthorizationV1().SelfSubjectAccessReviews().Create(ctx, ssar, metav1.CreateOptions{}) + if err != nil || !res.Status.Allowed { + log.Printf("AGUI Interrupt: User not authorized to update session %s/%s", projectName, sessionName) + c.JSON(http.StatusForbidden, gin.H{"error": "Unauthorized"}) + c.Abort() + return + } + + log.Printf("AGUI Interrupt: Request for %s/%s", projectName, sessionName) + + var input struct { + RunID string `json:"runId"` + } + if err := c.ShouldBindJSON(&input); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "runId required"}) + return + } + + // Get runner endpoint + runnerURL, err := getRunnerEndpoint(projectName, sessionName) + if err != nil { + log.Printf("AGUI Interrupt: Failed to get runner endpoint: %v", err) + c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Runner not available"}) + return + } + + interruptURL := strings.TrimSuffix(runnerURL, "/") + "/interrupt" + log.Printf("AGUI Interrupt: Forwarding to runner: %s", interruptURL) + + // POST to runner's interrupt endpoint + req, err := http.NewRequest("POST", interruptURL, bytes.NewReader([]byte("{}"))) + if err != nil { + log.Printf("AGUI Interrupt: Failed to create request: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + log.Printf("AGUI Interrupt: Request failed: %v", err) + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + log.Printf("AGUI Interrupt: Runner returned %d: %s", resp.StatusCode, string(body)) + c.JSON(resp.StatusCode, gin.H{"error": string(body)}) + return + } + + log.Printf("AGUI Interrupt: Successfully interrupted run %s", input.RunID) + c.JSON(http.StatusOK, gin.H{"message": "Interrupt signal sent"}) +} + +// getRunnerEndpoint returns the AG-UI server endpoint for a session +// The operator creates a Service named "session-{sessionName}" in the project namespace +func getRunnerEndpoint(projectName, sessionName string) (string, error) { + // Use naming convention for service discovery + // Format: http://session-{sessionName}.{projectName}.svc.cluster.local:8000/ + // The operator creates this Service automatically when spawning the runner Job + return fmt.Sprintf("http://session-%s.%s.svc.cluster.local:8000/", sessionName, projectName), nil +} + +// broadcastToThread sends event to all thread-level subscribers +func broadcastToThread(sessionID string, event interface{}) { + threadSubscribersMu.RLock() + subs, exists := threadSubscribers[sessionID] + threadSubscribersMu.RUnlock() + + if !exists { + return + } + + for ch := range subs { + select { + case ch <- event: + default: + // Channel full, skip + } + } +} + +// triggerDisplayNameGenerationIfNeeded checks if the session needs a display name +// and triggers async generation using the first REAL user message (not auto-sent initialPrompt) +func triggerDisplayNameGenerationIfNeeded(projectName, sessionName string, messages []types.Message) { + // Extract first user message + var userMessage string + for _, msg := range messages { + if msg.Role == "user" && msg.Content != "" { + userMessage = msg.Content + break + } + } + + if userMessage == "" { + log.Printf("DisplayNameGen: No user message found in run request for %s/%s", projectName, sessionName) + return + } + + // Check if session already has a display name + if handlers.DynamicClient == nil { + log.Printf("DisplayNameGen: DynamicClient not initialized, skipping display name generation") + return + } + + gvr := handlers.GetAgenticSessionV1Alpha1Resource() + ctx := context.Background() + + item, err := handlers.DynamicClient.Resource(gvr).Namespace(projectName).Get(ctx, sessionName, metav1.GetOptions{}) + if err != nil { + log.Printf("DisplayNameGen: Failed to get session %s/%s: %v", projectName, sessionName, err) + return + } + + // Extract spec using unstructured helpers (per CLAUDE.md guidelines) + spec, found, err := unstructured.NestedMap(item.Object, "spec") + if err != nil || !found { + log.Printf("DisplayNameGen: Failed to get spec for %s/%s", projectName, sessionName) + return + } + + // Skip if this message is the auto-sent initialPrompt (not a real user message) + initialPrompt, _, _ := unstructured.NestedString(spec, "initialPrompt") + if initialPrompt != "" && strings.TrimSpace(userMessage) == strings.TrimSpace(initialPrompt) { + log.Printf("DisplayNameGen: Skipping auto-sent initialPrompt for %s/%s", projectName, sessionName) + return + } + + // Check if display name generation is needed + if !handlers.ShouldGenerateDisplayName(spec) { + log.Printf("DisplayNameGen: Session %s/%s already has display name, skipping", projectName, sessionName) + return + } + + // Extract session context for better name generation + sessionCtx := handlers.ExtractSessionContext(spec) + + log.Printf("DisplayNameGen: Triggering async generation for %s/%s with message: %q", + projectName, sessionName, truncateForLog(userMessage, 50)) + + // Trigger async generation (runs in background, fails silently) + handlers.GenerateDisplayNameAsync(projectName, sessionName, userMessage, sessionCtx) +} + +// truncateForLog truncates a string for logging purposes +func truncateForLog(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] + "..." +} diff --git a/components/backend/websocket/compaction.go b/components/backend/websocket/compaction.go new file mode 100644 index 000000000..d03f77f7c --- /dev/null +++ b/components/backend/websocket/compaction.go @@ -0,0 +1,390 @@ +package websocket + +import ( + "ambient-code-backend/types" + "log" + + "github.com/google/uuid" +) + +// MessageCompactor compacts AG-UI events into message snapshots +// Per AG-UI spec: https://docs.ag-ui.com/concepts/serialization +type MessageCompactor struct { + messages []types.Message + currentMessage *types.Message + activeToolCalls map[string]*ActiveToolCall // toolId -> tool state + hiddenMessages map[string]bool // messageId -> hidden flag +} + +// ActiveToolCall tracks an in-progress tool call +type ActiveToolCall struct { + ID string + Name string + Args string // Accumulated from TOOL_CALL_ARGS deltas + ParentToolUseID string + Status string +} + +// NewMessageCompactor creates a new message compactor +func NewMessageCompactor() *MessageCompactor { + return &MessageCompactor{ + messages: make([]types.Message, 0), + activeToolCalls: make(map[string]*ActiveToolCall), + hiddenMessages: make(map[string]bool), + } +} + +// HandleEvent processes a single AG-UI event and updates compacted state +func (c *MessageCompactor) HandleEvent(event map[string]interface{}) { + eventType, _ := event["type"].(string) + + switch eventType { + case types.EventTypeTextMessageStart: + c.handleTextMessageStart(event) + case types.EventTypeTextMessageContent: + c.handleTextMessageContent(event) + case types.EventTypeTextMessageEnd: + c.handleTextMessageEnd(event) + case types.EventTypeToolCallStart: + c.handleToolCallStart(event) + case types.EventTypeToolCallArgs: + c.handleToolCallArgs(event) + case types.EventTypeToolCallEnd: + c.handleToolCallEnd(event) + case types.EventTypeRaw: + c.handleRawEvent(event) + case types.EventTypeMessagesSnapshot: + c.handleMessagesSnapshot(event) + case types.EventTypeRunStarted, types.EventTypeRunFinished, types.EventTypeRunError: + // Lifecycle events - skip, don't affect message compaction + case types.EventTypeStepStarted, types.EventTypeStepFinished: + // Step events - skip, don't affect message compaction + case types.EventTypeStateSnapshot, types.EventTypStateDelta: + // State events - skip, don't affect message compaction + case types.EventTypeActivitySnapshot, types.EventTypeActivityDelta: + // Activity events - skip, don't affect message compaction + default: + log.Printf("Compaction: WARNING - Unhandled event type: %s", eventType) + } +} + +// GetMessages returns the compacted messages (excluding hidden ones) +func (c *MessageCompactor) GetMessages() []types.Message { + // Flush any active message + if c.currentMessage != nil { + c.messages = append(c.messages, *c.currentMessage) + c.currentMessage = nil + } + + // DO NOT include in-progress tools in snapshots! + // Snapshots should only contain COMPLETED runs with finished tool calls. + // In-progress tools will be streamed as raw events from the active run. + // + // If we included "running" status tools here, they would duplicate when + // the active run's TOOL_CALL_END events are replayed. + if len(c.activeToolCalls) > 0 { + // Clear activeToolCalls - don't include them in snapshot + c.activeToolCalls = make(map[string]*ActiveToolCall) + } + + // Filter out hidden messages (auto-sent initial/workflow prompts) + visibleMessages := make([]types.Message, 0, len(c.messages)) + hiddenCount := 0 + for _, msg := range c.messages { + if c.hiddenMessages[msg.ID] { + hiddenCount++ + continue + } + visibleMessages = append(visibleMessages, msg) + } + + return visibleMessages +} + +// Event Handlers + +func (c *MessageCompactor) handleTextMessageStart(event map[string]interface{}) { + // Flush previous message if any + if c.currentMessage != nil { + c.messages = append(c.messages, *c.currentMessage) + } + + // Handle both camelCase and snake_case + messageID, _ := event["messageId"].(string) + if messageID == "" { + messageID, _ = event["message_id"].(string) + } + role, _ := event["role"].(string) + if role == "" { + role = types.RoleAssistant + } + + c.currentMessage = &types.Message{ + ID: messageID, + Role: role, + Content: "", + } +} + +func (c *MessageCompactor) handleTextMessageContent(event map[string]interface{}) { + if c.currentMessage == nil { + return + } + + delta, _ := event["delta"].(string) + c.currentMessage.Content += delta +} + +func (c *MessageCompactor) handleTextMessageEnd(event map[string]interface{}) { + if c.currentMessage != nil { + // User messages never have tool calls - flush immediately + // Assistant messages might have tool calls - keep open + // We'll flush when a new TEXT_MESSAGE_START arrives or at the end of compaction + if c.currentMessage.Role == types.RoleUser { + c.messages = append(c.messages, *c.currentMessage) + c.currentMessage = nil + } + } +} + +func (c *MessageCompactor) handleToolCallStart(event map[string]interface{}) { + // Handle both camelCase (TypeScript) and snake_case (Python ag_ui.core) + toolID, _ := event["toolCallId"].(string) + if toolID == "" { + toolID, _ = event["tool_call_id"].(string) + } + toolName, _ := event["toolCallName"].(string) + if toolName == "" { + toolName, _ = event["tool_call_name"].(string) + } + + // Try multiple field names for parent tool ID + parentToolUseID, _ := event["parentToolUseId"].(string) + if parentToolUseID == "" { + parentToolUseID, _ = event["parentToolUseID"].(string) + } + if parentToolUseID == "" { + parentToolUseID, _ = event["parent_tool_call_id"].(string) + } + + if toolID != "" { + c.activeToolCalls[toolID] = &ActiveToolCall{ + ID: toolID, + Name: toolName, + Args: "", + ParentToolUseID: parentToolUseID, + Status: "running", + } + } +} + +func (c *MessageCompactor) handleToolCallArgs(event map[string]interface{}) { + // Handle both camelCase and snake_case + toolID, _ := event["toolCallId"].(string) + if toolID == "" { + toolID, _ = event["tool_call_id"].(string) + } + delta, _ := event["delta"].(string) + + if toolID == "" { + return + } + + if active, ok := c.activeToolCalls[toolID]; ok { + active.Args += delta + } +} + +func (c *MessageCompactor) handleToolCallEnd(event map[string]interface{}) { + // Handle both camelCase and snake_case + toolID, _ := event["toolCallId"].(string) + if toolID == "" { + toolID, _ = event["tool_call_id"].(string) + } + result, _ := event["result"].(string) + errorStr, _ := event["error"].(string) + + if toolID == "" { + return + } + + active, ok := c.activeToolCalls[toolID] + if !ok { + return + } + + // Create completed tool call + tc := types.ToolCall{ + ID: active.ID, + Name: active.Name, + Args: active.Args, + Type: "function", + ParentToolUseID: active.ParentToolUseID, + Result: result, + Status: "completed", + } + if errorStr != "" { + tc.Error = errorStr + tc.Status = "error" + } + + // Add to message + // Check if we need to create a new message or add to current + if c.currentMessage != nil && c.currentMessage.Role == types.RoleAssistant { + // Add to current message + c.currentMessage.ToolCalls = append(c.currentMessage.ToolCalls, tc) + } else { + // Create new message for this tool call + c.messages = append(c.messages, types.Message{ + ID: uuid.New().String(), + Role: types.RoleAssistant, + ToolCalls: []types.ToolCall{tc}, + }) + } + + // Remove from active + delete(c.activeToolCalls, toolID) +} + +func (c *MessageCompactor) handleRawEvent(event map[string]interface{}) { + // Check for both "data" and "event" fields (AG-UI uses "event") + var data map[string]interface{} + if d, ok := event["event"].(map[string]interface{}); ok { + data = d + } else if d, ok := event["data"].(map[string]interface{}); ok { + data = d + } else { + return + } + + // Handle message_metadata events (for hiding auto-sent prompts) + if msgType, _ := data["type"].(string); msgType == "message_metadata" { + if hidden, _ := data["hidden"].(bool); hidden { + if messageID, ok := data["messageId"].(string); ok { + c.hiddenMessages[messageID] = true + } + } + return + } + + role, _ := data["role"].(string) + if role == "" { + return + } + + // Flush current message + if c.currentMessage != nil { + c.messages = append(c.messages, *c.currentMessage) + c.currentMessage = nil + } + + // Add raw message + msg := types.Message{Role: role} + if id, ok := data["id"].(string); ok { + msg.ID = id + } + if content, ok := data["content"].(string); ok { + msg.Content = content + } + if timestamp, ok := data["timestamp"].(string); ok { + msg.Timestamp = timestamp + } + + c.messages = append(c.messages, msg) +} + +func (c *MessageCompactor) handleMessagesSnapshot(event map[string]interface{}) { + // If runner sends MESSAGES_SNAPSHOT, use it directly (overrides compaction) + msgs, ok := event["messages"].([]interface{}) + if !ok { + return + } + + // Replace all messages with snapshot + c.messages = make([]types.Message, 0, len(msgs)) + c.currentMessage = nil + + for _, m := range msgs { + msgMap, ok := m.(map[string]interface{}) + if !ok { + continue + } + + msg := types.Message{} + if id, ok := msgMap["id"].(string); ok { + msg.ID = id + } + if role, ok := msgMap["role"].(string); ok { + msg.Role = role + } + if content, ok := msgMap["content"].(string); ok { + msg.Content = content + } + if timestamp, ok := msgMap["timestamp"].(string); ok { + msg.Timestamp = timestamp + } + + // Extract toolCalls array + if toolCalls, ok := msgMap["toolCalls"].([]interface{}); ok { + msg.ToolCalls = make([]types.ToolCall, 0, len(toolCalls)) + for _, tc := range toolCalls { + tcMap, ok := tc.(map[string]interface{}) + if !ok { + continue + } + + toolCall := types.ToolCall{} + if id, ok := tcMap["id"].(string); ok { + toolCall.ID = id + } + if name, ok := tcMap["name"].(string); ok { + toolCall.Name = name + } + if args, ok := tcMap["args"].(string); ok { + toolCall.Args = args + } + if tcType, ok := tcMap["type"].(string); ok { + toolCall.Type = tcType + } + if parentID, ok := tcMap["parentToolUseId"].(string); ok { + toolCall.ParentToolUseID = parentID + } + if result, ok := tcMap["result"].(string); ok { + toolCall.Result = result + } + if status, ok := tcMap["status"].(string); ok { + toolCall.Status = status + } + if errorStr, ok := tcMap["error"].(string); ok { + toolCall.Error = errorStr + } + + msg.ToolCalls = append(msg.ToolCalls, toolCall) + } + } + + c.messages = append(c.messages, msg) + } + +} + +// CompactEvents is the main entry point for event compaction +func CompactEvents(events []map[string]interface{}) []types.Message { + + // Count event types to help debug + eventTypeCounts := make(map[string]int) + for _, event := range events { + eventType, _ := event["type"].(string) + eventTypeCounts[eventType]++ + } + + compactor := NewMessageCompactor() + + for _, event := range events { + compactor.HandleEvent(event) + } + + messages := compactor.GetMessages() + + return messages +} diff --git a/components/backend/websocket/handlers.go b/components/backend/websocket/handlers.go deleted file mode 100644 index 5c3a5ad05..000000000 --- a/components/backend/websocket/handlers.go +++ /dev/null @@ -1,333 +0,0 @@ -package websocket - -import ( - "context" - "encoding/json" - "fmt" - "log" - "net/http" - "strings" - "time" - - "ambient-code-backend/handlers" - - "github.com/gin-gonic/gin" - "github.com/gorilla/websocket" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -// WebSocket upgrader -var upgrader = websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - // Allow all origins for development - should be restricted in production - return true - }, -} - -// HandleSessionWebSocket handles WebSocket connections for sessions -// Route: /projects/:projectName/sessions/:sessionId/ws -func HandleSessionWebSocket(c *gin.Context) { - sessionID := c.Param("sessionId") - log.Printf("handleSessionWebSocket for session: %s", sessionID) - - // Access enforced by RBAC on downstream resources - - // Best-effort user identity: prefer forwarded user, else extract ServiceAccount from bearer token - var userIDStr string - if v, ok := c.Get("userID"); ok { - if s, ok2 := v.(string); ok2 { - userIDStr = s - } - } - if userIDStr == "" { - if ns, sa, ok := handlers.ExtractServiceAccountFromAuth(c); ok { - userIDStr = ns + ":" + sa - } - } - - // Upgrade HTTP connection to WebSocket - conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) - if err != nil { - log.Printf("WebSocket upgrade failed: %v", err) - return - } - - sessionConn := &SessionConnection{ - SessionID: sessionID, - Conn: conn, - UserID: userIDStr, - } - - // Register connection - Hub.register <- sessionConn - - // Handle messages from client - go handleWebSocketMessages(sessionConn) - - // Keep connection alive - go handleWebSocketPing(sessionConn) -} - -// handleWebSocketMessages processes incoming WebSocket messages -func handleWebSocketMessages(conn *SessionConnection) { - defer func() { - Hub.unregister <- conn - }() - - for { - messageType, messageData, err := conn.Conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - log.Printf("WebSocket error: %v", err) - } - break - } - - if messageType == websocket.TextMessage { - var msg map[string]interface{} - if err := json.Unmarshal(messageData, &msg); err != nil { - log.Printf("Failed to parse WebSocket message: %v", err) - continue - } - - // Handle control messages - if msgType, ok := msg["type"].(string); ok { - if msgType == "ping" { - // Respond with pong - pong := map[string]interface{}{ - "type": "pong", - "timestamp": time.Now().UTC().Format(time.RFC3339), - } - pongData, _ := json.Marshal(pong) - // Lock write mutex before writing pong - conn.writeMu.Lock() - _ = conn.Conn.WriteMessage(websocket.TextMessage, pongData) - conn.writeMu.Unlock() - continue - } - // Extract payload from runner message to avoid double-nesting - // Runner sends: {type, seq, timestamp, payload} - // We only want to store the payload field - payload, ok := msg["payload"].(map[string]interface{}) - if !ok { - payload = msg // Fallback for legacy format - } - // Broadcast all other messages to session listeners (UI and others) - sessionMsg := &SessionMessage{ - SessionID: conn.SessionID, - Type: msgType, - Timestamp: time.Now().UTC().Format(time.RFC3339), - Payload: payload, - } - Hub.broadcast <- sessionMsg - } - } - } -} - -// handleWebSocketPing sends periodic ping messages -func handleWebSocketPing(conn *SessionConnection) { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for range ticker.C { - // Lock write mutex before writing ping - conn.writeMu.Lock() - err := conn.Conn.WriteMessage(websocket.PingMessage, nil) - conn.writeMu.Unlock() - if err != nil { - return - } - } -} - -// GetSessionMessagesWS handles GET /projects/:projectName/sessions/:sessionId/messages -// Retrieves messages from S3 storage -func GetSessionMessagesWS(c *gin.Context) { - sessionID := c.Param("sessionId") - - // Access enforced by RBAC on downstream resources - - messages, err := retrieveMessagesFromS3(sessionID) - if err != nil { - log.Printf("getSessionMessagesWS: retrieve failed: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{ - "error": fmt.Sprintf("failed to retrieve messages: %v", err), - }) - return - } - - // Optional consolidation of partial messages - includeParam := strings.ToLower(strings.TrimSpace(c.Query("include_partial_messages"))) - includePartials := includeParam == "1" || includeParam == "true" || includeParam == "yes" - - collapsed := make([]SessionMessage, 0, len(messages)) - activePartialIndex := -1 - for _, m := range messages { - if m.Type == "message.partial" { - if includePartials { - if activePartialIndex >= 0 { - collapsed[activePartialIndex] = m - } else { - collapsed = append(collapsed, m) - activePartialIndex = len(collapsed) - 1 - } - } - // If not including partials, simply skip adding them - continue - } - // On any non-partial, clear active partial placeholder - activePartialIndex = -1 - collapsed = append(collapsed, m) - } - - c.JSON(http.StatusOK, gin.H{ - "sessionId": sessionID, - "messages": collapsed, - }) -} - -// PostSessionMessageWS handles POST /projects/:projectName/sessions/:sessionId/messages -// Accepts a generic JSON body. If a "type" string is provided, it will be used. -// Otherwise, defaults to "user_message" and wraps body under payload. -func PostSessionMessageWS(c *gin.Context) { - projectName := c.Param("projectName") - sessionID := c.Param("sessionId") - - var body map[string]interface{} - if err := c.BindJSON(&body); err != nil { - log.Printf("postSessionMessageWS: bind failed: %v", err) - c.JSON(http.StatusBadRequest, gin.H{"error": "invalid JSON body"}) - return - } - - msgType := "user_message" - if v, ok := body["type"].(string); ok && v != "" { - msgType = v - // Remove type from payload to avoid duplication - delete(body, "type") - } - - message := &SessionMessage{ - SessionID: sessionID, - Type: msgType, - Timestamp: time.Now().UTC().Format(time.RFC3339), - Payload: body, - } - - // Broadcast to session listeners (runner) and persist - Hub.broadcast <- message - - // Check if we should auto-generate a display name - // Only for user_message type (not control messages like interrupt/end_session) - if msgType == "user_message" { - go triggerDisplayNameGenerationIfNeeded(projectName, sessionID, body) - } - - c.JSON(http.StatusAccepted, gin.H{"status": "queued"}) -} - -// maxUserMessageChars is the maximum characters to include from user messages for display name generation -const maxUserMessageChars = 1000 - -// triggerDisplayNameGenerationIfNeeded checks if display name generation should be triggered -// and initiates it asynchronously. This runs in a goroutine to not block the response. -func triggerDisplayNameGenerationIfNeeded(projectName, sessionID string, messageBody map[string]interface{}) { - // Extract current user message content - currentContent, ok := messageBody["content"].(string) - if !ok || strings.TrimSpace(currentContent) == "" { - return - } - - // Get session to check if displayName is set and get context - session, err := getSessionForDisplayName(projectName, sessionID) - if err != nil { - log.Printf("DisplayNameGen: Failed to get session %s/%s: %v", projectName, sessionID, err) - return - } - - spec, ok := session["spec"].(map[string]interface{}) - if !ok { - return - } - - // Check if display name should be generated (only if empty/unset) - if !handlers.ShouldGenerateDisplayName(spec) { - return - } - - log.Printf("DisplayNameGen: Triggering generation for %s/%s", projectName, sessionID) - - // Collect all user messages (existing + current) for better context - combinedContent := collectUserMessages(sessionID, currentContent) - - // Extract session context for better name generation - sessionCtx := handlers.ExtractSessionContext(spec) - - // Trigger async display name generation - handlers.GenerateDisplayNameAsync(projectName, sessionID, combinedContent, sessionCtx) -} - -// collectUserMessages fetches existing user messages from storage and combines with current message -// Returns a truncated string of all user messages (max maxUserMessageChars) -func collectUserMessages(sessionID, currentMessage string) string { - // Fetch existing messages from storage - existingMessages, err := retrieveMessagesFromS3(sessionID) - if err != nil { - log.Printf("DisplayNameGen: Failed to retrieve messages for %s: %v", sessionID, err) - // Fall back to just the current message - return truncateString(currentMessage, maxUserMessageChars) - } - - // Collect user message contents - var userMessages []string - for _, msg := range existingMessages { - if msg.Type == "user_message" { - // Extract content from payload (Payload is already map[string]interface{}) - if content, ok := msg.Payload["content"].(string); ok && strings.TrimSpace(content) != "" { - userMessages = append(userMessages, strings.TrimSpace(content)) - } - } - } - - // Add current message - userMessages = append(userMessages, strings.TrimSpace(currentMessage)) - - // Combine with separator - combined := strings.Join(userMessages, " | ") - - // Truncate if too long - return truncateString(combined, maxUserMessageChars) -} - -// truncateString truncates a string to maxLen characters, adding "..." if truncated -func truncateString(s string, maxLen int) string { - if len(s) <= maxLen { - return s - } - if maxLen <= 3 { - return s[:maxLen] - } - return s[:maxLen-3] + "..." -} - -// getSessionForDisplayName retrieves session data for display name generation -func getSessionForDisplayName(projectName, sessionID string) (map[string]interface{}, error) { - if handlers.DynamicClient == nil { - return nil, fmt.Errorf("dynamic client not initialized") - } - - gvr := handlers.GetAgenticSessionV1Alpha1Resource() - item, err := handlers.DynamicClient.Resource(gvr).Namespace(projectName).Get( - context.Background(), sessionID, metav1.GetOptions{}, - ) - if err != nil { - return nil, err - } - - return item.Object, nil -} - -// NOTE: GetSessionMessagesClaudeFormat removed - session continuation now uses -// SDK's built-in resume functionality with persisted ~/.claude state -// See: https://docs.claude.com/en/api/agent-sdk/sessions diff --git a/components/backend/websocket/hub.go b/components/backend/websocket/hub.go deleted file mode 100644 index e0cc79338..000000000 --- a/components/backend/websocket/hub.go +++ /dev/null @@ -1,202 +0,0 @@ -// Package websocket provides real-time WebSocket communication for session updates. -package websocket - -import ( - "bytes" - "encoding/json" - "fmt" - "log" - "os" - "sync" - "time" - - "github.com/gorilla/websocket" -) - -// SessionWebSocketHub manages WebSocket connections for sessions -type SessionWebSocketHub struct { - // Map of sessionID -> SessionConnection pointers - sessions map[string]map[*SessionConnection]bool - // Register new connections - register chan *SessionConnection - // Unregister connections - unregister chan *SessionConnection - // Broadcast messages to session - broadcast chan *SessionMessage - mu sync.RWMutex -} - -// SessionConnection represents a WebSocket connection to a session -type SessionConnection struct { - SessionID string - Conn *websocket.Conn - UserID string - writeMu sync.Mutex // Protects concurrent writes to Conn -} - -// SessionMessage represents a message in a session -type SessionMessage struct { - SessionID string `json:"sessionId"` - Type string `json:"type"` - Timestamp string `json:"timestamp"` - Payload map[string]interface{} `json:"payload"` - // Partial message support - Partial *PartialMessageInfo `json:"partial,omitempty"` -} - -// PartialMessageInfo for fragmented messages -type PartialMessageInfo struct { - ID string `json:"id"` - Index int `json:"index"` - Total int `json:"total"` - Data string `json:"data"` -} - -// Package-level variables -var ( - Hub *SessionWebSocketHub - StateBaseDir string -) - -// Initialize WebSocket hub -func init() { - Hub = &SessionWebSocketHub{ - sessions: make(map[string]map[*SessionConnection]bool), - register: make(chan *SessionConnection), - unregister: make(chan *SessionConnection), - broadcast: make(chan *SessionMessage), - } - go Hub.run() -} - -// run starts the WebSocket hub -func (h *SessionWebSocketHub) run() { - for { - select { - case conn := <-h.register: - h.mu.Lock() - if h.sessions[conn.SessionID] == nil { - h.sessions[conn.SessionID] = make(map[*SessionConnection]bool) - } - h.sessions[conn.SessionID][conn] = true - h.mu.Unlock() - log.Printf("WebSocket connection registered for session %s", conn.SessionID) - - case conn := <-h.unregister: - h.mu.Lock() - if connections, exists := h.sessions[conn.SessionID]; exists { - if _, exists := connections[conn]; exists { - delete(connections, conn) - conn.Conn.Close() - if len(connections) == 0 { - delete(h.sessions, conn.SessionID) - } - } - } - h.mu.Unlock() - log.Printf("WebSocket connection unregistered for session %s", conn.SessionID) - - case message := <-h.broadcast: - h.mu.RLock() - connections := h.sessions[message.SessionID] - h.mu.RUnlock() - - if connections != nil { - messageData, _ := json.Marshal(message) - for sessionConn := range connections { - // Lock write mutex before writing - sessionConn.writeMu.Lock() - err := sessionConn.Conn.WriteMessage(websocket.TextMessage, messageData) - sessionConn.writeMu.Unlock() - if err != nil { - // Unregister in goroutine to avoid deadlock - hub select loop - // can only process one case at a time, so blocking send would hang - go func(conn *SessionConnection) { - h.unregister <- conn - }(sessionConn) - } - } - } - - // Also persist to S3 - go persistMessageToS3(message) - } - } -} - -// SendMessageToSession sends a message to all connections for a session -func SendMessageToSession(sessionID string, messageType string, payload map[string]interface{}) { - message := &SessionMessage{ - SessionID: sessionID, - Type: messageType, - Timestamp: time.Now().UTC().Format(time.RFC3339), - Payload: payload, - } - - Hub.broadcast <- message -} - -// SendPartialMessage sends a fragmented message to a session -func SendPartialMessage(sessionID string, partialID string, index, total int, data string) { - message := &SessionMessage{ - SessionID: sessionID, - Type: "message.partial", - Timestamp: time.Now().UTC().Format(time.RFC3339), - Payload: map[string]interface{}{}, - Partial: &PartialMessageInfo{ - ID: partialID, - Index: index, - Total: total, - Data: data, - }, - } - - Hub.broadcast <- message -} - -// Helper functions - -func persistMessageToS3(message *SessionMessage) { - // Write messages to per-project content service path as JSONL append for now - // Backend does not have project in this scope; persist to local state dir for durability - path := fmt.Sprintf("%s/sessions/%s/messages.jsonl", StateBaseDir, message.SessionID) - log.Printf("persistMessageToS3: path: %s", path) - b, _ := json.Marshal(message) - // Ensure dir - _ = os.MkdirAll(fmt.Sprintf("%s/sessions/%s", StateBaseDir, message.SessionID), 0o755) - f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) - if err != nil { - log.Printf("persistMessage: open failed: %v", err) - return - } - defer f.Close() - if _, err := f.Write(append(b, '\n')); err != nil { - log.Printf("persistMessage: write failed: %v", err) - } -} - -func retrieveMessagesFromS3(sessionID string) ([]SessionMessage, error) { - // Read from local state JSONL path for now - path := fmt.Sprintf("%s/sessions/%s/messages.jsonl", StateBaseDir, sessionID) - data, err := os.ReadFile(path) - if err != nil { - log.Printf("retrieveMessagesFromS3: read failed: %v", err) - if os.IsNotExist(err) { - return []SessionMessage{}, nil - } - return nil, err - } - lines := bytes.Split(data, []byte("\n")) - msgs := make([]SessionMessage, 0, len(lines)) - for _, line := range lines { - line = bytes.TrimSpace(line) - if len(line) == 0 { - continue - } - var m SessionMessage - if err := json.Unmarshal(line, &m); err == nil { - msgs = append(msgs, m) - } - } - return msgs, nil -} diff --git a/components/backend/websocket/legacy_translator.go b/components/backend/websocket/legacy_translator.go new file mode 100644 index 000000000..bd61d6c8f --- /dev/null +++ b/components/backend/websocket/legacy_translator.go @@ -0,0 +1,114 @@ +package websocket + +import ( + "ambient-code-backend/types" + "crypto/rand" + "encoding/hex" + "encoding/json" + "log" + "os" + "time" +) + +// MigrateLegacySessionToAGUI converts old message format to AG-UI events +// Creates a MESSAGES_SNAPSHOT from legacy messages and persists it +func MigrateLegacySessionToAGUI(sessionID string) error { + // Check if session has legacy messages (JSONL format) + legacyPath := StateBaseDir + "/sessions/" + sessionID + "/messages.jsonl" + data, err := os.ReadFile(legacyPath) + if err != nil { + if os.IsNotExist(err) { + // No legacy file, nothing to migrate + return nil + } + return err + } + + log.Printf("LegacyMigration: Found legacy messages.jsonl for %s, converting to AG-UI", sessionID) + + // Parse JSONL - each line is a complete message + var legacyMessages []map[string]interface{} + lines := splitLines(data) + for _, line := range lines { + if len(line) == 0 { + continue + } + var msg map[string]interface{} + if err := json.Unmarshal(line, &msg); err == nil { + legacyMessages = append(legacyMessages, msg) + } + } + + // Convert to AG-UI Message format + messages := make([]types.Message, 0) + + for _, legacyMsg := range legacyMessages { + msgType, _ := legacyMsg["type"].(string) + payload, _ := legacyMsg["payload"].(map[string]interface{}) + + switch msgType { + case "user_message": + content, _ := payload["content"].(string) + messages = append(messages, types.Message{ + ID: generateEventID(), + Role: types.RoleUser, + Content: content, + }) + + case "agent.message": + // Check if it's a text message + if content, ok := payload["content"].(map[string]interface{}); ok { + textType, _ := content["type"].(string) + if textType == "text_block" { + text, _ := content["text"].(string) + messages = append(messages, types.Message{ + ID: generateEventID(), + Role: types.RoleAssistant, + Content: text, + }) + } + } + // Tool calls will be reconstructed from tool_result pairs + + // system.message, agent.running, agent.waiting are not chat messages, skip + } + } + + if len(messages) == 0 { + log.Printf("LegacyMigration: No chat messages found in legacy file") + return nil + } + + log.Printf("LegacyMigration: Converted %d legacy messages to AG-UI format", len(messages)) + + // Create MESSAGES_SNAPSHOT event and persist it + snapshot := map[string]interface{}{ + "type": types.EventTypeMessagesSnapshot, + "threadId": sessionID, + "runId": "legacy-migration", + "timestamp": time.Now().UTC().Format(time.RFC3339Nano), + "messages": messages, + } + + // Persist to agui-events.jsonl + persistAGUIEventMap(sessionID, "legacy-migration", snapshot) + + log.Printf("LegacyMigration: Persisted MESSAGES_SNAPSHOT with %d messages", len(messages)) + + // Rename legacy file to indicate it's been migrated + migratedPath := legacyPath + ".migrated" + if err := os.Rename(legacyPath, migratedPath); err != nil { + log.Printf("LegacyMigration: Warning - failed to rename legacy file: %v", err) + } else { + log.Printf("LegacyMigration: Renamed %s to %s", legacyPath, migratedPath) + } + + return nil +} + +// generateEventID creates a random ID for events +func generateEventID() string { + b := make([]byte, 16) + rand.Read(b) + return hex.EncodeToString(b) +} diff --git a/components/frontend/Dockerfile b/components/frontend/Dockerfile index c656ac5ac..b2e566825 100644 --- a/components/frontend/Dockerfile +++ b/components/frontend/Dockerfile @@ -1,6 +1,14 @@ # Use Red Hat UBI Node.js 20 minimal image for dependencies FROM registry.access.redhat.com/ubi9/nodejs-20-minimal AS deps +# Build arguments for metadata +ARG GIT_COMMIT=unknown +ARG GIT_BRANCH=unknown +ARG GIT_REPO=unknown +ARG GIT_VERSION=unknown +ARG BUILD_DATE=unknown +ARG BUILD_USER=unknown + WORKDIR /app USER 0 @@ -12,6 +20,14 @@ RUN npm ci # Rebuild the source code only when needed FROM registry.access.redhat.com/ubi9/nodejs-20-minimal AS builder +# Build arguments (need to redeclare for each stage) +ARG GIT_COMMIT=unknown +ARG GIT_BRANCH=unknown +ARG GIT_REPO=unknown +ARG GIT_VERSION=unknown +ARG BUILD_DATE=unknown +ARG BUILD_USER=unknown + USER 0 WORKDIR /app @@ -25,17 +41,48 @@ COPY . . # Uncomment the following line in case you want to disable telemetry during the build. ENV NEXT_TELEMETRY_DISABLED=1 +# Make build metadata available to Next.js at build time +ENV NEXT_PUBLIC_GIT_COMMIT=${GIT_COMMIT} +ENV NEXT_PUBLIC_GIT_BRANCH=${GIT_BRANCH} +ENV NEXT_PUBLIC_GIT_REPO=${GIT_REPO} +ENV NEXT_PUBLIC_GIT_VERSION=${GIT_VERSION} +ENV NEXT_PUBLIC_BUILD_DATE=${BUILD_DATE} +ENV NEXT_PUBLIC_BUILD_USER=${BUILD_USER} + RUN npm run build # Production image, copy all the files and run next FROM registry.access.redhat.com/ubi9/nodejs-20-minimal AS runner +# Build arguments (need to redeclare for final stage) +ARG GIT_COMMIT=unknown +ARG GIT_BRANCH=unknown +ARG GIT_REPO=unknown +ARG GIT_VERSION=unknown +ARG BUILD_DATE=unknown +ARG BUILD_USER=unknown + +# Add labels to force cache invalidation and provide metadata +LABEL git.commit="${GIT_COMMIT}" +LABEL git.branch="${GIT_BRANCH}" +LABEL git.version="${GIT_VERSION}" +LABEL build.date="${BUILD_DATE}" +LABEL build.user="${BUILD_USER}" + WORKDIR /app ENV NODE_ENV=production # Uncomment the following line in case you want to disable telemetry during runtime. ENV NEXT_TELEMETRY_DISABLED=1 +# Build metadata as environment variables (NEXT_PUBLIC_ prefix makes them available to client) +ENV NEXT_PUBLIC_GIT_COMMIT=${GIT_COMMIT} +ENV NEXT_PUBLIC_GIT_BRANCH=${GIT_BRANCH} +ENV NEXT_PUBLIC_GIT_REPO=${GIT_REPO} +ENV NEXT_PUBLIC_GIT_VERSION=${GIT_VERSION} +ENV NEXT_PUBLIC_BUILD_DATE=${BUILD_DATE} +ENV NEXT_PUBLIC_BUILD_USER=${BUILD_USER} + # Copy public assets COPY --from=builder /app/public ./public diff --git a/components/frontend/next.config.js b/components/frontend/next.config.js index bbab259ea..5ee112947 100644 --- a/components/frontend/next.config.js +++ b/components/frontend/next.config.js @@ -1,6 +1,9 @@ /** @type {import('next').NextConfig} */ const nextConfig = { - output: 'standalone' + output: 'standalone', + experimental: { + instrumentationHook: true, + } } module.exports = nextConfig diff --git a/components/frontend/package-lock.json b/components/frontend/package-lock.json index 82f2f638f..78539d3ad 100644 --- a/components/frontend/package-lock.json +++ b/components/frontend/package-lock.json @@ -30,6 +30,7 @@ "lucide-react": "^0.542.0", "next": "15.5.9", "next-themes": "^0.4.6", + "python-struct": "^1.1.3", "react": "^19.1.0", "react-dom": "^19.1.0", "react-hook-form": "^7.62.0", @@ -6257,6 +6258,12 @@ "dev": true, "license": "MIT" }, + "node_modules/long": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz", + "integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA==", + "license": "Apache-2.0" + }, "node_modules/longest-streak": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/longest-streak/-/longest-streak-3.1.0.tgz", @@ -8125,6 +8132,15 @@ "node": ">=6" } }, + "node_modules/python-struct": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/python-struct/-/python-struct-1.1.3.tgz", + "integrity": "sha512-UsI/mNvk25jRpGKYI38Nfbv84z48oiIWwG67DLVvjRhy8B/0aIK+5Ju5WOHgw/o9rnEmbAS00v4rgKFQeC332Q==", + "license": "MIT", + "dependencies": { + "long": "^4.0.0" + } + }, "node_modules/queue-microtask": { "version": "1.2.3", "resolved": "https://registry.npmjs.org/queue-microtask/-/queue-microtask-1.2.3.tgz", diff --git a/components/frontend/package.json b/components/frontend/package.json index 9deafaa57..7f0109453 100644 --- a/components/frontend/package.json +++ b/components/frontend/package.json @@ -31,6 +31,7 @@ "lucide-react": "^0.542.0", "next": "15.5.9", "next-themes": "^0.4.6", + "python-struct": "^1.1.3", "react": "^19.1.0", "react-dom": "^19.1.0", "react-hook-form": "^7.62.0", diff --git a/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/events/route.ts b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/events/route.ts new file mode 100644 index 000000000..d7f27ec53 --- /dev/null +++ b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/events/route.ts @@ -0,0 +1,94 @@ +/** + * AG-UI SSE Events Proxy + * Proxies the backend AG-UI SSE stream through Next.js for Bearer auth compatibility. + * + * Browser EventSource cannot set Authorization headers, so we proxy through + * the Next.js same-origin API to inject auth headers server-side. + * + * See: https://docs.ag-ui.com/quickstart/introduction + */ + +import { BACKEND_URL } from '@/lib/config' +import { buildForwardHeadersAsync } from '@/lib/auth' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +export async function GET( + request: Request, + { params }: { params: Promise<{ name: string; sessionName: string }> }, +) { + const { name, sessionName } = await params + const url = new URL(request.url) + const runId = url.searchParams.get('runId') || '' + + // Build auth headers from the incoming request + const headers = await buildForwardHeadersAsync(request) + + // Remove Content-Type as we're making a GET request for SSE + delete headers['Content-Type'] + + // Build backend URL + let backendUrl = `${BACKEND_URL}/projects/${encodeURIComponent(name)}/agentic-sessions/${encodeURIComponent(sessionName)}/agui/events` + if (runId) { + backendUrl += `?runId=${encodeURIComponent(runId)}` + } + + try { + // Fetch from backend SSE endpoint + const response = await fetch(backendUrl, { + method: 'GET', + headers: { + ...headers, + Accept: 'text/event-stream', + 'Cache-Control': 'no-cache', + }, + // @ts-expect-error - Node.js fetch supports duplex for streaming + duplex: 'half', + }) + + if (!response.ok) { + const errorText = await response.text() + return new Response(JSON.stringify({ error: errorText }), { + status: response.status, + headers: { 'Content-Type': 'application/json' }, + }) + } + + // Pipe the SSE stream through + const { readable, writable } = new TransformStream() + + // Forward the body in a non-blocking way + if (response.body) { + response.body.pipeTo(writable).catch((err) => { + // ResponseAborted is normal when client disconnects, don't log as error + if (err?.name !== 'AbortError' && !err?.message?.includes('ResponseAborted')) { + console.error('AG-UI SSE proxy pipe error:', err) + } + }) + } + + return new Response(readable, { + status: 200, + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache, no-store, must-revalidate', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }, + }) + } catch (error) { + // Don't log ECONNREFUSED as error during backend restarts - it's expected + const isConnRefused = error && typeof error === 'object' && 'code' in error && error.code === 'ECONNREFUSED' + if (!isConnRefused) { + console.error('AG-UI SSE proxy error:', error) + } else { + console.log('Backend temporarily unavailable (ECONNREFUSED), client will retry') + } + return new Response( + JSON.stringify({ error: 'Failed to connect to AG-UI event stream' }), + { status: 503, headers: { 'Content-Type': 'application/json' } }, + ) + } +} + diff --git a/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/history/route.ts b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/history/route.ts new file mode 100644 index 000000000..f763a7a33 --- /dev/null +++ b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/history/route.ts @@ -0,0 +1,36 @@ +/** + * AG-UI History Endpoint Proxy + * Returns compacted message history for a session. + * + * See: https://docs.ag-ui.com/concepts/serialization + */ + +import { BACKEND_URL } from '@/lib/config' +import { buildForwardHeadersAsync } from '@/lib/auth' + +export async function GET( + request: Request, + { params }: { params: Promise<{ name: string; sessionName: string }> }, +) { + const { name, sessionName } = await params + const url = new URL(request.url) + const runId = url.searchParams.get('runId') || '' + const headers = await buildForwardHeadersAsync(request) + + let backendUrl = `${BACKEND_URL}/projects/${encodeURIComponent(name)}/agentic-sessions/${encodeURIComponent(sessionName)}/agui/history` + if (runId) { + backendUrl += `?runId=${encodeURIComponent(runId)}` + } + + const resp = await fetch(backendUrl, { + method: 'GET', + headers, + }) + + const data = await resp.text() + return new Response(data, { + status: resp.status, + headers: { 'Content-Type': 'application/json' }, + }) +} + diff --git a/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/interrupt/route.ts b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/interrupt/route.ts new file mode 100644 index 000000000..25fe0d5c5 --- /dev/null +++ b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/interrupt/route.ts @@ -0,0 +1,39 @@ +/** + * AG-UI Interrupt Endpoint Proxy + * Forwards interrupt signal to backend to stop Claude SDK execution. + * + * See: https://platform.claude.com/docs/en/agent-sdk/python#methods + */ + +import { BACKEND_URL } from '@/lib/config' +import { buildForwardHeadersAsync } from '@/lib/auth' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +export async function POST( + request: Request, + { params }: { params: Promise<{ name: string; sessionName: string }> }, +) { + const { name, sessionName } = await params + const headers = await buildForwardHeadersAsync(request) + const body = await request.text() + + const backendUrl = `${BACKEND_URL}/projects/${encodeURIComponent(name)}/agentic-sessions/${encodeURIComponent(sessionName)}/agui/interrupt` + + const resp = await fetch(backendUrl, { + method: 'POST', + headers: { + ...headers, + 'Content-Type': 'application/json', + }, + body, + }) + + const data = await resp.text() + return new Response(data, { + status: resp.status, + headers: { 'Content-Type': 'application/json' }, + }) +} + diff --git a/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/run/route.ts b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/run/route.ts new file mode 100644 index 000000000..f3f303358 --- /dev/null +++ b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/run/route.ts @@ -0,0 +1,41 @@ +/** + * AG-UI Run Endpoint Proxy + * Creates a new agent run and returns metadata immediately. + * Events are broadcast to GET /agui/events subscribers (middleware pattern). + * + * See: https://docs.ag-ui.com/concepts/architecture + */ + +import { BACKEND_URL } from '@/lib/config' +import { buildForwardHeadersAsync } from '@/lib/auth' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +export async function POST( + request: Request, + { params }: { params: Promise<{ name: string; sessionName: string }> }, +) { + const { name, sessionName } = await params + const headers = await buildForwardHeadersAsync(request) + const body = await request.text() + + const backendUrl = `${BACKEND_URL}/projects/${encodeURIComponent(name)}/agentic-sessions/${encodeURIComponent(sessionName)}/agui/run` + + const resp = await fetch(backendUrl, { + method: 'POST', + headers: { + ...headers, + 'Content-Type': 'application/json', + }, + body, + }) + + // Backend returns JSON metadata immediately (not SSE stream) + // Events are broadcast to GET /agui/events subscribers + const data = await resp.text() + return new Response(data, { + status: resp.status, + headers: { 'Content-Type': 'application/json' }, + }) +} diff --git a/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/runs/route.ts b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/runs/route.ts new file mode 100644 index 000000000..344c7e9fa --- /dev/null +++ b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/agui/runs/route.ts @@ -0,0 +1,31 @@ +/** + * AG-UI Runs Endpoint Proxy + * Returns list of runs for a session (thread). + * + * See: https://docs.ag-ui.com/concepts/serialization + */ + +import { BACKEND_URL } from '@/lib/config' +import { buildForwardHeadersAsync } from '@/lib/auth' + +export async function GET( + request: Request, + { params }: { params: Promise<{ name: string; sessionName: string }> }, +) { + const { name, sessionName } = await params + const headers = await buildForwardHeadersAsync(request) + + const backendUrl = `${BACKEND_URL}/projects/${encodeURIComponent(name)}/agentic-sessions/${encodeURIComponent(sessionName)}/agui/runs` + + const resp = await fetch(backendUrl, { + method: 'GET', + headers, + }) + + const data = await resp.text() + return new Response(data, { + status: resp.status, + headers: { 'Content-Type': 'application/json' }, + }) +} + diff --git a/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/messages/route.ts b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/messages/route.ts deleted file mode 100644 index 977ce9741..000000000 --- a/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/messages/route.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { BACKEND_URL } from '@/lib/config' -import { buildForwardHeadersAsync } from '@/lib/auth' - -export async function GET( - request: Request, - { params }: { params: Promise<{ name: string; sessionName: string }> }, -) { - const { name, sessionName } = await params - const headers = await buildForwardHeadersAsync(request) - const resp = await fetch(`${BACKEND_URL}/projects/${encodeURIComponent(name)}/sessions/${encodeURIComponent(sessionName)}/messages`, { - method: 'GET', - headers, - }) - const data = await resp.text() - return new Response(data, { status: resp.status, headers: { 'Content-Type': 'application/json' } }) -} - -export async function POST( - request: Request, - { params }: { params: Promise<{ name: string; sessionName: string }> }, -) { - const { name, sessionName } = await params - const headers = await buildForwardHeadersAsync(request) - const body = await request.text() - const resp = await fetch(`${BACKEND_URL}/projects/${encodeURIComponent(name)}/sessions/${encodeURIComponent(sessionName)}/messages`, { - method: 'POST', - headers: { ...headers, 'Content-Type': 'application/json' }, - body, - }) - const data = await resp.text() - return new Response(data, { status: resp.status, headers: { 'Content-Type': 'application/json' } }) -} diff --git a/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/workspace/upload/route.ts b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/workspace/upload/route.ts index 298026029..a0abc32ff 100644 --- a/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/workspace/upload/route.ts +++ b/components/frontend/src/app/api/projects/[name]/agentic-sessions/[sessionName]/workspace/upload/route.ts @@ -256,9 +256,6 @@ async function compressImageIfNeeded( } const finalSize = compressed.byteLength; - console.log( - `Compressed ${contentType} image: ${originalSize} bytes -> ${finalSize} bytes (${Math.round((finalSize / originalSize) * 100)}%)` - ); // Convert Node.js Buffer to ArrayBuffer by creating a new ArrayBuffer and copying data const arrayBuffer = new ArrayBuffer(finalSize); diff --git a/components/frontend/src/app/api/projects/[name]/agentic-sessions/route.ts b/components/frontend/src/app/api/projects/[name]/agentic-sessions/route.ts index 7bef7fd5c..1f6a43680 100644 --- a/components/frontend/src/app/api/projects/[name]/agentic-sessions/route.ts +++ b/components/frontend/src/app/api/projects/[name]/agentic-sessions/route.ts @@ -31,14 +31,6 @@ export async function POST( const body = await request.text(); const headers = await buildForwardHeadersAsync(request); - console.log('[API Route] Creating session for project:', name); - console.log('[API Route] Auth headers present:', { - hasUser: !!headers['X-Forwarded-User'], - hasUsername: !!headers['X-Forwarded-Preferred-Username'], - hasToken: !!headers['X-Forwarded-Access-Token'], - hasEmail: !!headers['X-Forwarded-Email'], - }); - const response = await fetch(`${BACKEND_URL}/projects/${encodeURIComponent(name)}/agentic-sessions`, { method: 'POST', headers, @@ -46,7 +38,6 @@ export async function POST( }); const text = await response.text(); - console.log('[API Route] Backend response status:', response.status); if (!response.ok) { console.error('[API Route] Backend error:', text); } diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-workflow-management.ts b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-workflow-management.ts index 69567908d..654823911 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-workflow-management.ts +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-workflow-management.ts @@ -48,17 +48,8 @@ export function useWorkflowManagement({ throw new Error(errorData.error || "Failed to update workflow"); } - // 2. Send WebSocket message to trigger workflow clone and restart - await fetch(`/api/projects/${projectName}/agentic-sessions/${sessionName}/messages`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - type: "workflow_change", - gitUrl: pendingWorkflow.gitUrl, - branch: pendingWorkflow.branch, - path: pendingWorkflow.path || "", - }), - }); + // Note: Workflow clone and restart handled by operator + // Initial workflow prompt auto-executed via AG-UI pattern (POST /agui/run) successToast(`Activating workflow: ${pendingWorkflow.name}`); setActiveWorkflow(pendingWorkflow.id); diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/lib/message-adapter.ts b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/lib/message-adapter.ts deleted file mode 100644 index e5d346824..000000000 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/lib/message-adapter.ts +++ /dev/null @@ -1,228 +0,0 @@ -import type { SessionMessage } from "@/types"; -import type { MessageObject, ToolUseMessages } from "@/types/agentic-session"; -import type { RawWireMessage, InnerEnvelope, ToolUseBlockWithTimestamp, ToolResultBlockWithTimestamp } from "./types"; - -/** - * Converts raw wire messages from the backend into structured MessageObject and ToolUseMessages - * for display in the UI. This handles all the complex message parsing and transformation logic. - */ -export function adaptSessionMessages( - messages: SessionMessage[], - isInteractive: boolean = false -): Array { - try { - const toolUseBlocks: ToolUseBlockWithTimestamp[] = []; - const toolResultBlocks: ToolResultBlockWithTimestamp[] = []; - const agenticMessages: MessageObject[] = []; - - for (const raw of messages as RawWireMessage[]) { - const envelope: InnerEnvelope = ((raw?.payload as InnerEnvelope) ?? (raw as unknown as InnerEnvelope)) || {}; - const innerType: string = (raw as unknown as InnerEnvelope)?.type || envelope.type || ""; - const innerTs: string = raw?.timestamp || envelope.timestamp || new Date().toISOString(); - const payloadValue = envelope.payload; - const innerPayload: Record = (payloadValue && typeof payloadValue === 'object' && !Array.isArray(payloadValue)) - ? (payloadValue as Record) - : ((typeof envelope === 'object' ? (envelope as unknown as Record) : {}) as Record); - const partial = (envelope.partial as InnerEnvelope["partial"]) || ((raw as unknown as { partial?: InnerEnvelope["partial"] })?.partial) || undefined; - - switch (innerType) { - case "message.partial": { - const text = partial?.data || ""; - if (text) { - agenticMessages.push({ - type: "agent_message", - content: { type: "text_block", text }, - model: "claude", - timestamp: innerTs, - }); - } - break; - } - case "agent.message": { - if (partial?.data) { - const text = String(partial.data || ""); - if (text) { - agenticMessages.push({ - type: "agent_message", - content: { type: "text_block", text }, - model: "claude", - timestamp: innerTs, - }); - break; - } - } - - const toolName = (innerPayload?.tool as string | undefined); - const toolInput = (innerPayload?.input as Record | undefined) || {}; - const providedId = (innerPayload?.id as string | undefined); - const result = innerPayload?.tool_result as unknown as { tool_use_id?: string; content?: unknown; is_error?: boolean } | undefined; - - if (toolName) { - const id = providedId ? String(providedId) : String(envelope?.seq ?? `${toolName}-${toolUseBlocks.length}`); - toolUseBlocks.push({ - block: { type: "tool_use_block", id, name: toolName, input: toolInput }, - timestamp: innerTs, - }); - } else if (result?.tool_use_id) { - toolResultBlocks.push({ - block: { - type: "tool_result_block", - tool_use_id: String(result.tool_use_id), - content: (result.content as string | Array> | null | undefined) ?? null, - is_error: Boolean(result.is_error), - }, - timestamp: innerTs, - }); - } else if ((innerPayload as Record)?.type === 'result.message') { - let rp: Record = (innerPayload.payload as Record) || {}; - if (rp && typeof rp === 'object' && 'payload' in rp && rp.payload && typeof rp.payload === 'object') { - rp = rp.payload as Record; - } - agenticMessages.push({ - type: "result_message", - subtype: String(rp.subtype || ""), - duration_ms: Number(rp.duration_ms || 0), - duration_api_ms: Number(rp.duration_api_ms || 0), - is_error: Boolean(rp.is_error || false), - num_turns: Number(rp.num_turns || 0), - session_id: String(rp.session_id || ""), - total_cost_usd: (typeof rp.total_cost_usd === 'number' ? rp.total_cost_usd : null), - usage: (typeof rp.usage === 'object' && rp.usage ? rp.usage as Record : null), - result: (typeof rp.result === 'string' ? rp.result : null), - timestamp: innerTs, - }); - if (typeof rp.result === 'string' && rp.result.trim()) { - agenticMessages.push({ - type: "agent_message", - content: { type: "text_block", text: String(rp.result) }, - model: "claude", - timestamp: innerTs, - }); - } - } else { - const envelopePayload = envelope.payload; - const contentText = (innerPayload.content as Record | undefined)?.text; - const messageText = innerPayload.message; - const nestedContentText = (innerPayload.payload as Record | undefined)?.content as Record | undefined; - const text = (typeof envelopePayload === 'string') - ? String(envelopePayload) - : ( - (typeof contentText === 'string' ? String(contentText) : undefined) - || (typeof messageText === 'string' ? String(messageText) : undefined) - || (typeof nestedContentText?.text === 'string' ? String(nestedContentText.text) : '') - ); - if (text) { - agenticMessages.push({ - type: "agent_message", - content: { type: "text_block", text }, - model: "claude", - timestamp: innerTs, - }); - } - } - break; - } - case "system.message": { - let text = ""; - let isDebug = false; - - // The envelope object might have message/payload at different levels - // Try envelope.payload first, then fall back to envelope itself - const envelopeObj = envelope as { message?: string; payload?: string | { message?: string; payload?: string; debug?: boolean }; debug?: boolean }; - - // Check if envelope.payload is a string - if (typeof envelopeObj.payload === 'string') { - text = envelopeObj.payload; - } - // Check if envelope.payload is an object with message or payload - else if (typeof envelopeObj.payload === 'object' && envelopeObj.payload !== null) { - const payloadObj = envelopeObj.payload as { message?: string; payload?: string; debug?: boolean }; - text = payloadObj.message || (typeof payloadObj.payload === 'string' ? payloadObj.payload : ""); - isDebug = payloadObj.debug === true; - } - // Fall back to envelope.message directly - else if (typeof envelopeObj.message === 'string') { - text = envelopeObj.message; - } - - if (envelopeObj.debug === true) { - isDebug = true; - } - - // Always create a system message - show the raw envelope if we couldn't extract text - agenticMessages.push({ - type: "system_message", - subtype: "system.message", - data: { - message: text || `[system event: ${JSON.stringify(envelope)}]`, - debug: isDebug - }, - timestamp: innerTs, - }); - break; - } - case "user.message": - case "user_message": { - const text = (innerPayload?.content as string | undefined) || ""; - if (text) { - agenticMessages.push({ - type: "user_message", - content: { type: "text_block", text }, - timestamp: innerTs, - }); - } - break; - } - case "agent.running": { - agenticMessages.push({ type: "agent_running", timestamp: innerTs }); - break; - } - case "agent.waiting": { - agenticMessages.push({ type: "agent_waiting", timestamp: innerTs }); - break; - } - default: { - agenticMessages.push({ - type: "system_message", - subtype: innerType || "unknown", - data: innerPayload || {}, - timestamp: innerTs, - }); - } - } - } - - const toolUseMessages: ToolUseMessages[] = []; - for (const tu of toolUseBlocks) { - const match = toolResultBlocks.find((tr) => tr.block.tool_use_id === tu.block.id); - if (match) { - toolUseMessages.push({ - type: "tool_use_messages", - timestamp: tu.timestamp, - toolUseBlock: tu.block, - resultBlock: match.block, - }); - } else { - toolUseMessages.push({ - type: "tool_use_messages", - timestamp: tu.timestamp, - toolUseBlock: tu.block, - resultBlock: { type: "tool_result_block", tool_use_id: tu.block.id, content: null, is_error: false }, - }); - } - } - - const all = [...agenticMessages, ...toolUseMessages]; - const sorted = all.sort((a, b) => { - const at = new Date(a.timestamp || 0).getTime(); - const bt = new Date(b.timestamp || 0).getTime(); - return at - bt; - }); - - return isInteractive ? sorted.filter((m) => m.type !== "result_message") : sorted; - } catch (error) { - console.error('Failed to adapt session messages:', error); - return []; // Return empty array on error - } -} - diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx index a6a46aee5..8c964eeb2 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx @@ -78,20 +78,19 @@ import { McpIntegrationsAccordion } from "./components/accordions/mcp-integratio import { useGitOperations } from "./hooks/use-git-operations"; import { useWorkflowManagement } from "./hooks/use-workflow-management"; import { useFileOperations } from "./hooks/use-file-operations"; -import { adaptSessionMessages } from "./lib/message-adapter"; import type { DirectoryOption, DirectoryRemote } from "./lib/types"; -import type { SessionMessage } from "@/types"; -import type { MessageObject, ToolUseMessages } from "@/types/agentic-session"; +import type { MessageObject, ToolUseMessages, HierarchicalToolMessage } from "@/types/agentic-session"; +import type { AGUIToolCall } from "@/types/agui"; + +// AG-UI streaming +import { useAGUIStream } from "@/hooks/use-agui-stream"; // React Query hooks import { useSession, - useSessionMessages, useStopSession, useDeleteSession, - useSendChatMessage, - useSendControlMessage, useSessionK8sResources, useContinueSession, } from "@/services/queries"; @@ -191,11 +190,6 @@ export default function ProjectSessionDetailPage({ error, refetch: refetchSession, } = useSession(projectName, sessionName); - const { data: messages = [] } = useSessionMessages( - projectName, - sessionName, - session?.status?.phase, - ); const { data: k8sResources } = useSessionK8sResources( projectName, sessionName, @@ -203,8 +197,60 @@ export default function ProjectSessionDetailPage({ const stopMutation = useStopSession(); const deleteMutation = useDeleteSession(); const continueMutation = useContinueSession(); - const sendChatMutation = useSendChatMessage(); - const sendControlMutation = useSendControlMessage(); + + // AG-UI streaming hook - replaces useSessionMessages and useSendChatMessage + // Note: autoConnect is intentionally false to avoid SSR hydration mismatch + // Connection is triggered manually in useEffect after client hydration + const aguiStream = useAGUIStream({ + projectName: projectName || "", + sessionName: sessionName || "", + autoConnect: false, // Manual connection after hydration + onError: (err) => console.error("AG-UI stream error:", err), + }); + const aguiState = aguiStream.state; + const aguiSendMessage = aguiStream.sendMessage; + const aguiInterrupt = aguiStream.interrupt; + const isRunActive = aguiStream.isRunActive; + const aguiConnectRef = useRef(aguiStream.connect); + + // Keep connect ref up to date + useEffect(() => { + aguiConnectRef.current = aguiStream.connect; + }, [aguiStream.connect]); + + // Connect to AG-UI event stream for history and live updates + // AG-UI pattern: GET /agui/events streams ALL thread events (past + future) + // POST /agui/run creates runs, events broadcast to GET subscribers + const hasConnectedRef = useRef(false); + useEffect(() => { + if (!projectName || !sessionName) return; + + // Connect once on mount and keep connection open + if (!hasConnectedRef.current) { + hasConnectedRef.current = true; + aguiConnectRef.current(); + } + }, [projectName, sessionName]); + + // Auto-send initial prompt (handles session start, workflow activation, restarts) + // AG-UI pattern: Client (or backend) initiates runs via POST /agui/run + const lastProcessedPromptRef = useRef(""); + + useEffect(() => { + if (!session || !aguiSendMessage) return; + + const initialPrompt = session?.spec?.initialPrompt; + + // NOTE: Initial prompt execution handled by backend auto-trigger (StartSession handler) + // Backend waits for subscriber before executing, ensuring events are received + // This works for both UI and headless/API usage + + // Track that we've seen this prompt (for workflow changes) + if (initialPrompt && lastProcessedPromptRef.current !== initialPrompt) { + lastProcessedPromptRef.current = initialPrompt; + } + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [session?.spec?.initialPrompt, session?.status?.phase, aguiState.messages.length, aguiState.status]); // Workflow management hook const workflowManagement = useWorkflowManagement({ @@ -460,10 +506,10 @@ export default function ProjectSessionDetailPage({ // Track when first message loads useEffect(() => { - if (messages && messages.length > 0 && !firstMessageLoaded) { + if (aguiState.messages && aguiState.messages.length > 0 && !firstMessageLoaded) { setFirstMessageLoaded(true); } - }, [messages, firstMessageLoaded]); + }, [aguiState.messages, firstMessageLoaded]); // Load active workflow and remotes from session useEffect(() => { @@ -548,13 +594,365 @@ export default function ProjectSessionDetailPage({ ); }; - // Convert messages using extracted adapter - const streamMessages: Array = useMemo(() => { - return adaptSessionMessages( - messages as SessionMessage[], - session?.spec?.interactive || false, - ); - }, [messages, session?.spec?.interactive]); + // Convert AG-UI messages to display format with hierarchical tool call rendering + const streamMessages: Array = useMemo(() => { + + // Helper function to parse tool arguments + const parseToolArgs = (args: string | undefined): Record => { + if (!args) return {}; + try { + const parsed = JSON.parse(args); + if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) { + return parsed as Record; + } + return { value: parsed }; + } catch { + return { _raw: String(args || '') }; + } + }; + + // Helper function to create a tool message from a tool call + const createToolMessage = ( + tc: AGUIToolCall, + timestamp: string + ): ToolUseMessages => { + const toolInput = parseToolArgs(tc.args); + return { + type: "tool_use_messages", + timestamp, + toolUseBlock: { + type: "tool_use_block", + id: tc.id, + name: tc.name, + input: toolInput, + }, + resultBlock: { + type: "tool_result_block", + tool_use_id: tc.id, + content: tc.result || null, + is_error: tc.status === "error", + }, + }; + }; + + const result: Array = []; + + // Phase A: Collect all tool calls from all messages for hierarchy building + const allToolCalls = new Map(); + + for (const msg of aguiState.messages) { + const timestamp = msg.timestamp || new Date().toISOString(); + + if (msg.toolCalls && Array.isArray(msg.toolCalls)) { + for (const tc of msg.toolCalls) { + if (tc && tc.id && tc.name) { + allToolCalls.set(tc.id, { tc, timestamp }); + } + } + } + } + + // Add currently streaming tool call to the map if present + // This ensures streaming tools (both parents and children) are included in hierarchy + // CRITICAL: Don't require name - add even if name is null to prevent orphaned children + if (aguiState.currentToolCall?.id) { + const streamingToolId = aguiState.currentToolCall.id; + const streamingParentId = aguiState.currentToolCall.parentToolUseId; + const toolName = aguiState.currentToolCall.name || "unknown_tool"; // Default if null + + // Create a pseudo-tool-call for the streaming tool + const streamingTC: AGUIToolCall = { + id: streamingToolId, + name: toolName, + args: aguiState.currentToolCall.args || "", + type: "function", + parentToolUseId: streamingParentId, + status: "running", + }; + + if (!allToolCalls.has(streamingToolId)) { + allToolCalls.set(streamingToolId, { + tc: streamingTC, + timestamp: new Date().toISOString() + }); + } + } + + // Add pending children to render map so they show during streaming! + // These are children that finished before their parent tool finished + if (aguiState.pendingChildren && aguiState.pendingChildren.size > 0) { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for (const [parentId, children] of aguiState.pendingChildren.entries()) { + for (const childMsg of children) { + if (childMsg.toolCalls) { + for (const tc of childMsg.toolCalls) { + if (!allToolCalls.has(tc.id)) { + allToolCalls.set(tc.id, { + tc: tc, + timestamp: new Date().toISOString(), + }); + } + } + } + } + } + } + + // Phase B: Build parent-child relationships + const topLevelTools = new Set(); + const childrenByParent = new Map(); + + for (const [toolId, { tc }] of allToolCalls) { + if (tc.parentToolUseId) { + // This is a child tool call + if (!childrenByParent.has(tc.parentToolUseId)) { + childrenByParent.set(tc.parentToolUseId, []); + } + childrenByParent.get(tc.parentToolUseId)!.push(toolId); + } else { + // This is a top-level tool call + topLevelTools.add(toolId); + } + } + + // Handle orphaned children - but DON'T promote to top-level if parent is streaming + for (const [toolId, { tc }] of allToolCalls) { + if (tc.parentToolUseId && !allToolCalls.has(tc.parentToolUseId)) { + // Check if parent is the currently streaming tool + if (aguiState.currentToolCall?.id === tc.parentToolUseId) { + // Don't promote to top-level - parent is streaming and will appear + } else { + // Parent truly not found, render as top-level (fallback) + console.warn(` ⚠️ Orphaned child: ${tc.name} (${toolId.substring(0, 8)}) - parent ${tc.parentToolUseId.substring(0, 8)} not found`); + topLevelTools.add(toolId); + } + } + } + + // Track which tool calls we've already rendered + const renderedToolCalls = new Set(); + + // Phase C: Process messages and build hierarchical structure + for (const msg of aguiState.messages) { + const timestamp = msg.timestamp || new Date().toISOString(); + + // Handle text content by role + if (msg.role === "user") { + result.push({ + type: "user_message", + content: { type: "text_block", text: msg.content || "" }, + timestamp, + }); + } else if (msg.role === "assistant") { + // Check if this is a thinking block (from RAW event) + const metadata = msg.metadata as Record | undefined; + if (metadata?.type === "thinking_block") { + result.push({ + type: "agent_message", + content: { + type: "thinking_block", + thinking: metadata.thinking as string || "", + signature: metadata.signature as string || "", + }, + model: "claude", + timestamp, + }); + } else if (msg.content) { + // Only push text message if there's actual content + result.push({ + type: "agent_message", + content: { type: "text_block", text: msg.content }, + model: "claude", + timestamp, + }); + } + } else if (msg.role === "tool") { + // Standalone tool results (not from toolCalls array) + if (msg.toolCallId && !allToolCalls.has(msg.toolCallId)) { + result.push({ + type: "tool_use_messages", + timestamp, + toolUseBlock: { + type: "tool_use_block", + id: msg.toolCallId, + name: msg.name || "tool", + input: {}, + }, + resultBlock: { + type: "tool_result_block", + tool_use_id: msg.toolCallId, + content: msg.content || null, + is_error: false, + }, + }); + } + } else if (msg.role === "system") { + result.push({ + type: "system_message", + subtype: "system.message", + data: { message: msg.content || "" }, + timestamp, + }); + } + + // Handle tool calls embedded in this message + if (msg.toolCalls && Array.isArray(msg.toolCalls)) { + for (const tc of msg.toolCalls) { + if (!tc || !tc.id || !tc.name) continue; + + // Skip if already rendered or if it's a child (will be rendered inside parent) + if (renderedToolCalls.has(tc.id)) { + continue; + } + if (!topLevelTools.has(tc.id)) { + continue; + } + + // Build children array for this tool call + const childIds = childrenByParent.get(tc.id) || []; + + const children: ToolUseMessages[] = childIds + .map(childId => { + const childData = allToolCalls.get(childId); + if (!childData) return null; + renderedToolCalls.add(childId); + return createToolMessage(childData.tc, childData.timestamp); + }) + .filter((c): c is ToolUseMessages => c !== null); + + // Create the hierarchical tool message + const toolInput = parseToolArgs(tc.args); + + const toolMessage: HierarchicalToolMessage = { + type: "tool_use_messages", + timestamp, + toolUseBlock: { + type: "tool_use_block", + id: tc.id, + name: tc.name, + input: toolInput, + }, + resultBlock: { + type: "tool_result_block", + tool_use_id: tc.id, + content: tc.result || null, + is_error: tc.status === "error", + }, + children: children.length > 0 ? children : undefined, + }; + + result.push(toolMessage); + renderedToolCalls.add(tc.id); + } + } + } + + // Add streaming message if currently streaming + if (aguiState.currentMessage?.content) { + result.push({ + type: "agent_message", + content: { type: "text_block", text: aguiState.currentMessage.content }, + model: "claude", + timestamp: new Date().toISOString(), + streaming: true, + } as MessageObject & { streaming?: boolean }); + } + + // Render ALL currently streaming tool calls (supports parallel tool execution) + // CRITICAL: This renders tools immediately when TOOL_CALL_START arrives, + // not waiting until TOOL_CALL_END like the allToolCalls map approach does + const pendingToolCalls = aguiState.pendingToolCalls || new Map(); + + for (const [toolId, pendingTool] of pendingToolCalls) { + if (renderedToolCalls.has(toolId)) continue; + + const toolName = pendingTool.name || "unknown_tool"; + const toolArgs = pendingTool.args || ""; + const streamingParentId = pendingTool.parentToolUseId; + + // Only render if this is a top-level tool (not a child waiting for parent) + // Children will be rendered nested inside their parent + const isTopLevel = !streamingParentId || !pendingToolCalls.has(streamingParentId); + + if (isTopLevel) { + const toolInput = parseToolArgs(toolArgs); + + // Get any pending children for this tool (children that finished before parent) + const pendingForThis = aguiState.pendingChildren?.get(toolId) || []; + const children: ToolUseMessages[] = pendingForThis + .map(childMsg => { + const childTC = childMsg.toolCalls?.[0]; + if (!childTC) return null; + return createToolMessage(childTC, new Date().toISOString()); + }) + .filter((c): c is ToolUseMessages => c !== null); + + // Also include any streaming children from pendingToolCalls + for (const [childId, childTool] of pendingToolCalls) { + if (childTool.parentToolUseId === toolId && !renderedToolCalls.has(childId)) { + const childInput = parseToolArgs(childTool.args || ""); + children.push({ + type: "tool_use_messages", + timestamp: new Date().toISOString(), + toolUseBlock: { + type: "tool_use_block", + id: childId, + name: childTool.name, + input: childInput, + }, + resultBlock: { + type: "tool_result_block", + tool_use_id: childId, + content: null, // Still streaming + is_error: false, + }, + }); + renderedToolCalls.add(childId); + } + } + + // Also include any children from the childrenByParent map + const childIds = childrenByParent.get(toolId) || []; + for (const childId of childIds) { + if (renderedToolCalls.has(childId)) continue; + const childData = allToolCalls.get(childId); + if (childData) { + children.push(createToolMessage(childData.tc, childData.timestamp)); + renderedToolCalls.add(childId); + } + } + + const streamingToolMessage: HierarchicalToolMessage = { + type: "tool_use_messages", + timestamp: new Date().toISOString(), + toolUseBlock: { + type: "tool_use_block", + id: toolId, + name: toolName, + input: toolInput, + }, + resultBlock: { + type: "tool_result_block", + tool_use_id: toolId, + content: null, // No result yet - still running! + is_error: false, + }, + children: children.length > 0 ? children : undefined, + }; + + result.push(streamingToolMessage); + renderedToolCalls.add(toolId); + } + } + + return result; + }, [ + aguiState.messages, + aguiState.currentMessage, + aguiState.currentToolCall, + aguiState.pendingToolCalls, // CRITICAL: Include so UI updates when new tools start + aguiState.pendingChildren, // CRITICAL: Include so UI updates when children finish + ]); // Auto-refresh artifacts when messages complete // UX improvement: Automatically refresh the artifacts panel when Claude writes new files, @@ -682,58 +1080,35 @@ export default function ProjectSessionDetailPage({ ); }; - const sendChat = () => { + const sendChat = async () => { if (!chatInput.trim()) return; const finalMessage = chatInput.trim(); + setChatInput(""); - sendChatMutation.mutate( - { projectName, sessionName, content: finalMessage }, - { - onSuccess: () => { - setChatInput(""); - }, - onError: (err) => - errorToast( - err instanceof Error ? err.message : "Failed to send message", - ), - }, - ); + try { + await aguiSendMessage(finalMessage); + } catch (err) { + errorToast(err instanceof Error ? err.message : "Failed to send message"); + } }; - const handleCommandClick = (slashCommand: string) => { - const finalMessage = slashCommand; - - sendChatMutation.mutate( - { projectName, sessionName, content: finalMessage }, - { - onSuccess: () => { - successToast(`Command ${slashCommand} sent`); - }, - onError: (err) => - errorToast( - err instanceof Error ? err.message : "Failed to send command", - ), - }, - ); + const handleCommandClick = async (slashCommand: string) => { + try { + await aguiSendMessage(slashCommand); + successToast(`Command ${slashCommand} sent`); + } catch (err) { + errorToast(err instanceof Error ? err.message : "Failed to send command"); + } }; - const handleInterrupt = () => { - sendControlMutation.mutate( - { projectName, sessionName, type: "interrupt" }, - { - onSuccess: () => successToast("Agent interrupted"), - onError: (err) => - errorToast( - err instanceof Error ? err.message : "Failed to interrupt agent", - ), - }, - ); - }; + // LEGACY: Old handleInterrupt removed - now using aguiInterrupt from useAGUIStream + // which calls the proper AG-UI interrupt endpoint that signals Claude SDK const handleEndSession = () => { - sendControlMutation.mutate( - { projectName, sessionName, type: "end_session" }, + // Use stop API to end the session + stopMutation.mutate( + { projectName, sessionName, data: { reason: "end_session" } }, { onSuccess: () => successToast("Session ended successfully"), onError: (err) => @@ -873,7 +1248,7 @@ export default function ProjectSessionDetailPage({ onDelete={handleDelete} durationMs={durationMs} k8sResources={k8sResources} - messageCount={messages.length} + messageCount={aguiState.messages.length} renderMode="kebab-only" /> @@ -1445,12 +1820,13 @@ export default function ProjectSessionDetailPage({ chatInput={chatInput} setChatInput={setChatInput} onSendChat={() => Promise.resolve(sendChat())} - onInterrupt={() => Promise.resolve(handleInterrupt())} + onInterrupt={aguiInterrupt} onEndSession={() => Promise.resolve(handleEndSession())} onGoToResults={() => {}} onContinue={handleContinue} workflowMetadata={workflowMetadata} onCommandClick={handleCommandClick} + isRunActive={isRunActive} /> diff --git a/components/frontend/src/components/session/MessagesTab.tsx b/components/frontend/src/components/session/MessagesTab.tsx index 0fc7e6f50..014defb4e 100644 --- a/components/frontend/src/components/session/MessagesTab.tsx +++ b/components/frontend/src/components/session/MessagesTab.tsx @@ -6,6 +6,7 @@ import { Badge } from "@/components/ui/badge"; import { Alert, AlertDescription, AlertTitle } from "@/components/ui/alert"; import { MessageSquare, Loader2, Settings, Terminal, Users } from "lucide-react"; import { StreamMessage } from "@/components/ui/stream-message"; +import { LoadingDots } from "@/components/ui/message"; import { DropdownMenu, DropdownMenuContent, @@ -28,10 +29,11 @@ export type MessagesTabProps = { onContinue: () => void; workflowMetadata?: WorkflowMetadata; onCommandClick?: (slashCommand: string) => void; + isRunActive?: boolean; // NEW: Track if agent is actively processing }; -const MessagesTab: React.FC = ({ session, streamMessages, chatInput, setChatInput, onSendChat, onInterrupt, onEndSession, onGoToResults, onContinue, workflowMetadata, onCommandClick }) => { +const MessagesTab: React.FC = ({ session, streamMessages, chatInput, setChatInput, onSendChat, onInterrupt, onEndSession, onGoToResults, onContinue, workflowMetadata, onCommandClick, isRunActive = false }) => { const [sendingChat, setSendingChat] = useState(false); const [interrupting, setInterrupting] = useState(false); const [ending, setEnding] = useState(false); @@ -277,6 +279,13 @@ const MessagesTab: React.FC = ({ session, streamMessages, chat ))} + {/* Show loading indicator when agent is actively processing */} + {isRunActive && filteredMessages.length > 0 && ( +
+ +
+ )} + {filteredMessages.length === 0 && isCreating && (
@@ -337,7 +346,7 @@ const MessagesTab: React.FC = ({ session, streamMessages, chat