diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go index 0e359f93..b413c966 100644 --- a/components/backend/handlers/sessions.go +++ b/components/backend/handlers/sessions.go @@ -14,6 +14,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "unicode/utf8" @@ -46,6 +47,20 @@ var ( const runnerTokenRefreshedAtAnnotation = "ambient-code.io/token-refreshed-at" +// ootbWorkflowsCache provides in-memory caching for OOTB workflows to avoid GitHub API rate limits. +// The cache stores workflows by repo URL key and expires after ootbCacheTTL. +type ootbWorkflowsCache struct { + mu sync.RWMutex + workflows []OOTBWorkflow + cachedAt time.Time + cacheKey string // repo+branch+path combination +} + +var ( + ootbCache = &ootbWorkflowsCache{} + ootbCacheTTL = 5 * time.Minute // Cache OOTB workflows for 5 minutes +) + // isBinaryContentType checks if a MIME type represents binary content that should be base64 encoded. // This includes images, archives, documents, executables, and other non-text formats. func isBinaryContentType(contentType string) bool { @@ -1718,35 +1733,10 @@ type OOTBWorkflow struct { } // ListOOTBWorkflows returns the list of out-of-the-box workflows dynamically discovered from GitHub -// Attempts to use user's GitHub token for better rate limits, falls back to unauthenticated for public repos +// Uses in-memory caching (5 min TTL) to avoid GitHub API rate limits. +// Attempts to use user's GitHub token for better rate limits when cache miss occurs. // GET /api/workflows/ootb?project= func ListOOTBWorkflows(c *gin.Context) { - // Try to get user's GitHub token (best effort - not required) - // This gives better rate limits (5000/hr vs 60/hr) and supports private repos - // Project is optional - if provided, we'll try to get the user's token - token := "" - project := c.Query("project") // Optional query parameter - if project != "" { - usrID, _ := c.Get("userID") - k8sClt, sessDyn := GetK8sClientsForRequest(c) - if k8sClt == nil || sessDyn == nil { - c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid or missing token"}) - c.Abort() - return - } - if userIDStr, ok := usrID.(string); ok && userIDStr != "" { - if githubToken, err := GetGitHubToken(c.Request.Context(), k8sClt, sessDyn, project, userIDStr); err == nil { - token = githubToken - log.Printf("ListOOTBWorkflows: using user's GitHub token for project %s (better rate limits)", project) - } else { - log.Printf("ListOOTBWorkflows: failed to get GitHub token for project %s: %v", project, err) - } - } - } - if token == "" { - log.Printf("ListOOTBWorkflows: proceeding without GitHub token (public repo, lower rate limits)") - } - // Read OOTB repo configuration from environment ootbRepo := strings.TrimSpace(os.Getenv("OOTB_WORKFLOWS_REPO")) if ootbRepo == "" { @@ -1763,6 +1753,43 @@ func ListOOTBWorkflows(c *gin.Context) { ootbWorkflowsPath = "workflows" } + // Build cache key from repo configuration + cacheKey := fmt.Sprintf("%s|%s|%s", ootbRepo, ootbBranch, ootbWorkflowsPath) + + // Check cache first (read lock) + ootbCache.mu.RLock() + if ootbCache.cacheKey == cacheKey && time.Since(ootbCache.cachedAt) < ootbCacheTTL && len(ootbCache.workflows) > 0 { + workflows := ootbCache.workflows + ootbCache.mu.RUnlock() + log.Printf("ListOOTBWorkflows: returning %d cached workflows (age: %v)", len(workflows), time.Since(ootbCache.cachedAt).Round(time.Second)) + c.JSON(http.StatusOK, gin.H{"workflows": workflows}) + return + } + ootbCache.mu.RUnlock() + + // Cache miss - need to fetch from GitHub + // Try to get user's GitHub token (best effort - not required) + // This gives better rate limits (5000/hr vs 60/hr) and supports private repos + token := "" + project := c.Query("project") // Optional query parameter + if project != "" { + usrID, _ := c.Get("userID") + k8sClt, sessDyn := GetK8sClientsForRequest(c) + if k8sClt != nil && sessDyn != nil { + if userIDStr, ok := usrID.(string); ok && userIDStr != "" { + if githubToken, err := GetGitHubToken(c.Request.Context(), k8sClt, sessDyn, project, userIDStr); err == nil { + token = githubToken + log.Printf("ListOOTBWorkflows: using user's GitHub token for project %s (better rate limits)", project) + } else { + log.Printf("ListOOTBWorkflows: failed to get GitHub token for project %s: %v", project, err) + } + } + } + } + if token == "" { + log.Printf("ListOOTBWorkflows: proceeding without GitHub token (public repo, lower rate limits)") + } + // Parse GitHub URL owner, repoName, err := git.ParseGitHubURL(ootbRepo) if err != nil { @@ -1775,6 +1802,16 @@ func ListOOTBWorkflows(c *gin.Context) { entries, err := fetchGitHubDirectoryListing(c.Request.Context(), owner, repoName, ootbBranch, ootbWorkflowsPath, token) if err != nil { log.Printf("ListOOTBWorkflows: failed to list workflows directory: %v", err) + // On error, try to return stale cache if available + ootbCache.mu.RLock() + if len(ootbCache.workflows) > 0 && ootbCache.cacheKey == cacheKey { + workflows := ootbCache.workflows + ootbCache.mu.RUnlock() + log.Printf("ListOOTBWorkflows: returning stale cached workflows due to GitHub error") + c.JSON(http.StatusOK, gin.H{"workflows": workflows}) + return + } + ootbCache.mu.RUnlock() c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to discover OOTB workflows"}) return } @@ -1822,7 +1859,14 @@ func ListOOTBWorkflows(c *gin.Context) { }) } - log.Printf("ListOOTBWorkflows: discovered %d workflows from %s", len(workflows), ootbRepo) + // Update cache (write lock) + ootbCache.mu.Lock() + ootbCache.workflows = workflows + ootbCache.cachedAt = time.Now() + ootbCache.cacheKey = cacheKey + ootbCache.mu.Unlock() + + log.Printf("ListOOTBWorkflows: discovered %d workflows from %s (cached for %v)", len(workflows), ootbRepo, ootbCacheTTL) c.JSON(http.StatusOK, gin.H{"workflows": workflows}) } diff --git a/components/backend/websocket/agui_proxy.go b/components/backend/websocket/agui_proxy.go index 6a60ba79..1122bdc8 100644 --- a/components/backend/websocket/agui_proxy.go +++ b/components/backend/websocket/agui_proxy.go @@ -147,26 +147,58 @@ func HandleAGUIRunProxy(c *gin.Context) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Hour) defer cancel() - proxyReq, err := http.NewRequestWithContext(ctx, "POST", runnerURL, bytes.NewReader(bodyBytes)) - if err != nil { - log.Printf("AGUI Proxy: Failed to create request in background: %v", err) - updateRunStatus(runID, "error") - return - } - - // Forward headers - proxyReq.Header.Set("Content-Type", "application/json") - proxyReq.Header.Set("Accept", "text/event-stream") - - // Execute request + // Execute request with retries (runner may not be ready immediately after startup) client := &http.Client{ Timeout: 0, // No timeout, context handles it } - resp, err := client.Do(proxyReq) - if err != nil { - log.Printf("AGUI Proxy: Background request failed: %v", err) - updateRunStatus(runID, "error") - return + + var resp *http.Response + maxRetries := 15 + retryDelay := 500 * time.Millisecond + + for attempt := 1; attempt <= maxRetries; attempt++ { + // Create fresh request for each attempt (body reader needs reset) + proxyReq, err := http.NewRequestWithContext(ctx, "POST", runnerURL, bytes.NewReader(bodyBytes)) + if err != nil { + log.Printf("AGUI Proxy: Failed to create request in background: %v", err) + updateRunStatus(runID, "error") + return + } + + // Forward headers + proxyReq.Header.Set("Content-Type", "application/json") + proxyReq.Header.Set("Accept", "text/event-stream") + + resp, err = client.Do(proxyReq) + if err == nil { + break // Success! + } + + // Check if it's a connection refused error (runner not ready yet) + errStr := err.Error() + isConnectionRefused := strings.Contains(errStr, "connection refused") || + strings.Contains(errStr, "no such host") || + strings.Contains(errStr, "dial tcp") + + if !isConnectionRefused || attempt == maxRetries { + log.Printf("AGUI Proxy: Background request failed after %d attempts: %v", attempt, err) + updateRunStatus(runID, "error") + return + } + + log.Printf("AGUI Proxy: Runner not ready (attempt %d/%d), retrying in %v...", attempt, maxRetries, retryDelay) + + select { + case <-ctx.Done(): + log.Printf("AGUI Proxy: Context cancelled during retry for run %s", runID) + return + case <-time.After(retryDelay): + // Exponential backoff with cap at 5 seconds + retryDelay = time.Duration(float64(retryDelay) * 1.5) + if retryDelay > 5*time.Second { + retryDelay = 5 * time.Second + } + } } defer resp.Body.Close() diff --git a/components/runners/claude-code-runner/adapter.py b/components/runners/claude-code-runner/adapter.py index 15800ba4..419e493d 100644 --- a/components/runners/claude-code-runner/adapter.py +++ b/components/runners/claude-code-runner/adapter.py @@ -76,9 +76,8 @@ def __init__(self): self._current_run_id: Optional[str] = None self._current_thread_id: Optional[str] = None - # Active Claude SDK client for interrupt support + # Active client reference for interrupt support self._active_client: Optional[Any] = None - self._active_client_ctx: Optional[Any] = None async def initialize(self, context: RunnerContext): """Initialize the adapter with context.""" @@ -116,6 +115,7 @@ async def process_run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEven Args: input_data: RunAgentInput with thread_id, run_id, messages, tools + app_state: Optional FastAPI app.state for persistent client storage/reuse Yields: AG-UI events (RunStartedEvent, TextMessageContentEvent, etc.) @@ -126,6 +126,10 @@ async def process_run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEven self._current_thread_id = thread_id self._current_run_id = run_id + # Check for newly available Google OAuth credentials (user may have authenticated mid-session) + # This picks up credentials after K8s syncs the mounted secret (~60s after OAuth completes) + await self.refresh_google_credentials() + try: # Emit RUN_STARTED yield RunStartedEvent( @@ -278,8 +282,16 @@ def _extract_user_message(self, input_data: RunAgentInput) -> str: async def _run_claude_agent_sdk( self, prompt: str, thread_id: str, run_id: str ) -> AsyncIterator[BaseEvent]: - """Execute the Claude Code SDK with the given prompt and yield AG-UI events.""" - logger.info(f"_run_claude_agent_sdk called with prompt length={len(prompt)}") + """Execute the Claude Code SDK with the given prompt and yield AG-UI events. + + Creates a fresh client for each run - simpler and more reliable than client reuse. + + Args: + prompt: The user prompt to send to Claude + thread_id: AG-UI thread identifier + run_id: AG-UI run identifier + """ + logger.info(f"_run_claude_agent_sdk called with prompt length={len(prompt)}, will create fresh client") try: # Check for authentication method logger.info("Checking authentication configuration...") @@ -467,13 +479,15 @@ def create_sdk_client(opts, disable_continue=False): opts.continue_conversation = False return ClaudeSDKClient(options=opts) - # Create SDK client with retry logic + # Always create a fresh client for each run (simple and reliable) + logger.info("Creating new ClaudeSDKClient for this run...") + try: - logger.info("Creating ClaudeSDKClient context manager...") - client_ctx = create_sdk_client(options) - logger.info("Entering ClaudeSDKClient context (initializing subprocess)...") - client = await client_ctx.__aenter__() - logger.info("ClaudeSDKClient initialized successfully!") + logger.info("Creating ClaudeSDKClient...") + client = create_sdk_client(options) + logger.info("Connecting ClaudeSDKClient (initializing subprocess)...") + await client.connect() + logger.info("ClaudeSDKClient connected successfully!") except Exception as resume_error: error_str = str(resume_error).lower() if "no conversation found" in error_str or "session" in error_str: @@ -484,16 +498,15 @@ def create_sdk_client(opts, disable_continue=False): run_id=run_id, event={"type": "system_log", "message": "⚠️ Could not continue conversation, starting fresh..."} ) - client_ctx = create_sdk_client(options, disable_continue=True) - client = await client_ctx.__aenter__() + client = create_sdk_client(options, disable_continue=True) + await client.connect() else: raise - # Store active client for interrupt support - self._active_client = client - self._active_client_ctx = client_ctx - try: + # Store client reference for interrupt support + self._active_client = client + if not self._first_run: yield RawEvent( type=EventType.RAW, @@ -714,19 +727,19 @@ def create_sdk_client(opts, disable_continue=False): self._first_run = False finally: - await client_ctx.__aexit__(None, None, None) - # Clear active client reference + # Clear active client reference (interrupt no longer valid for this run) self._active_client = None - self._active_client_ctx = None - + + # Always disconnect client at end of run (no persistence) + if client is not None: + logger.info("Disconnecting client (end of run)") + await client.disconnect() + # Finalize observability await obs.finalize() except Exception as e: logger.error(f"Failed to run Claude Code SDK: {e}") - # Clear active client on error - self._active_client = None - self._active_client_ctx = None if 'obs' in locals(): await obs.cleanup_on_error(e) raise @@ -734,14 +747,11 @@ def create_sdk_client(opts, disable_continue=False): async def interrupt(self) -> None: """ Interrupt the active Claude SDK execution. - - Sends interrupt signal to stop Claude mid-execution. - See: https://platform.claude.com/docs/en/agent-sdk/python#methods """ - if not self._active_client: + if self._active_client is None: logger.warning("Interrupt requested but no active client") return - + try: logger.info("Sending interrupt signal to Claude SDK client...") await self._active_client.interrupt() @@ -749,6 +759,7 @@ async def interrupt(self) -> None: except Exception as e: logger.error(f"Failed to interrupt Claude SDK: {e}") + def _setup_workflow_paths(self, active_workflow_url: str, repos_cfg: list) -> tuple[str, list, str]: """Setup paths for workflow mode.""" add_dirs = [] @@ -1461,12 +1472,35 @@ def _build_workspace_context_prompt(self, repos_cfg, workflow_name, artifacts_pa async def _setup_google_credentials(self): - """Copy Google OAuth credentials from mounted Secret to writable workspace location.""" - # Check if Google OAuth secret is mounted + """Copy Google OAuth credentials from mounted Secret to writable workspace location. + + The secret is always mounted (as placeholder if user hasn't authenticated). + This method checks if credentials.json exists and has content. + Call refresh_google_credentials() periodically to pick up new credentials after OAuth. + """ + await self._try_copy_google_credentials() + + async def _try_copy_google_credentials(self) -> bool: + """Attempt to copy Google credentials from mounted secret. + + Returns: + True if credentials were successfully copied, False otherwise. + """ secret_path = Path("/app/.google_workspace_mcp/credentials/credentials.json") + + # Check if secret file exists if not secret_path.exists(): - logging.debug("Google OAuth credentials not found at %s, skipping setup", secret_path) - return + logging.debug("Google OAuth credentials not found at %s (placeholder secret or not mounted)", secret_path) + return False + + # Check if file has content (not empty placeholder) + try: + if secret_path.stat().st_size == 0: + logging.debug("Google OAuth credentials file is empty (user hasn't authenticated yet)") + return False + except OSError as e: + logging.debug("Could not stat Google OAuth credentials file: %s", e) + return False # Create writable credentials directory in workspace workspace_creds_dir = Path("/workspace/.google_workspace_mcp/credentials") @@ -1479,5 +1513,41 @@ async def _setup_google_credentials(self): # Make it writable so workspace-mcp can update tokens dest_path.chmod(0o644) logging.info("✓ Copied Google OAuth credentials from Secret to writable workspace at %s", dest_path) + return True except Exception as e: - logging.error("Failed to copy Google OAuth credentials: %s", e) \ No newline at end of file + logging.error("Failed to copy Google OAuth credentials: %s", e) + return False + + async def refresh_google_credentials(self) -> bool: + """Check for and copy new Google OAuth credentials. + + Call this method periodically (e.g., before processing a message) to detect + when a user completes the OAuth flow and credentials become available. + + Kubernetes automatically updates the mounted secret volume when the secret + changes (typically within ~60 seconds), so this will pick up new credentials + without requiring a pod restart. + + Returns: + True if new credentials were found and copied, False otherwise. + """ + dest_path = Path("/workspace/.google_workspace_mcp/credentials/credentials.json") + + # If we already have credentials in workspace, check if source is newer + if dest_path.exists(): + secret_path = Path("/app/.google_workspace_mcp/credentials/credentials.json") + if secret_path.exists(): + try: + # Compare modification times - secret mount updates when K8s syncs + if secret_path.stat().st_mtime > dest_path.stat().st_mtime: + logging.info("Detected updated Google OAuth credentials, refreshing...") + return await self._try_copy_google_credentials() + except OSError: + pass + return False + + # No credentials yet, try to copy + if await self._try_copy_google_credentials(): + logging.info("✓ Google OAuth credentials now available (user completed authentication)") + return True + return False \ No newline at end of file diff --git a/components/runners/claude-code-runner/main.py b/components/runners/claude-code-runner/main.py index 631c303a..dee4e6c9 100644 --- a/components/runners/claude-code-runner/main.py +++ b/components/runners/claude-code-runner/main.py @@ -80,6 +80,7 @@ async def lifespan(app: FastAPI): # Import adapter here to avoid circular imports from adapter import ClaudeCodeAdapter + from pathlib import Path # Initialize context from environment session_id = os.getenv("SESSION_ID", "unknown") @@ -95,76 +96,87 @@ async def lifespan(app: FastAPI): adapter = ClaudeCodeAdapter() adapter.context = context - # Check for INITIAL_PROMPT and auto-execute on startup - # Runner knows when it's ready, so this is more reliable than backend timing + logger.info("Adapter initialized - fresh client will be created for each run") + + # Check if this is a restart (conversation history exists) + history_marker = Path(workspace_path) / ".claude" / "state" + + # Check for INITIAL_PROMPT and auto-execute (only if this is first run) initial_prompt = os.getenv("INITIAL_PROMPT", "").strip() - if initial_prompt: - logger.info(f"INITIAL_PROMPT detected ({len(initial_prompt)} chars), will auto-execute after server starts") - # Schedule auto-execution after server is ready - import asyncio + if initial_prompt and not history_marker.exists(): + logger.info(f"INITIAL_PROMPT detected ({len(initial_prompt)} chars), will auto-execute after 3s delay") asyncio.create_task(auto_execute_initial_prompt(initial_prompt, session_id)) + elif initial_prompt: + logger.info(f"INITIAL_PROMPT detected but conversation history exists - skipping auto-execution (session restart)") logger.info(f"AG-UI server ready for session {session_id}") yield # Cleanup - logger.info("Shutting down AG-UI server") + logger.info("Shutting down AG-UI server...") async def auto_execute_initial_prompt(prompt: str, session_id: str): - """Auto-execute INITIAL_PROMPT by POSTing to backend after server is ready.""" + """Auto-execute INITIAL_PROMPT by POSTing to backend after short delay. + + The 3-second delay gives the runner time to fully start. Backend has retry + logic to handle if Service DNS isn't ready yet. + """ import uuid import aiohttp - # Wait for FastAPI server to be fully ready - await asyncio.sleep(2) + # Give runner time to fully start before backend tries to reach us + logger.info("Waiting 3s before auto-executing INITIAL_PROMPT (allow Service DNS to propagate)...") + await asyncio.sleep(3) - logger.info(f"Auto-executing INITIAL_PROMPT via backend POST...") + logger.info("Auto-executing INITIAL_PROMPT via backend POST...") + + # Get backend URL from environment + backend_url = os.getenv("BACKEND_API_URL", "").rstrip("/") + project_name = os.getenv("PROJECT_NAME", "").strip() or os.getenv("AGENTIC_SESSION_NAMESPACE", "").strip() + + if not backend_url or not project_name: + logger.error("Cannot auto-execute INITIAL_PROMPT: BACKEND_API_URL or PROJECT_NAME not set") + return + + # BACKEND_API_URL already includes /api suffix from operator + url = f"{backend_url}/projects/{project_name}/agentic-sessions/{session_id}/agui/run" + logger.info(f"Auto-execution URL: {url}") + + payload = { + "threadId": session_id, + "runId": str(uuid.uuid4()), + "messages": [{ + "id": str(uuid.uuid4()), + "role": "user", + "content": prompt, + "metadata": { + "hidden": True, + "autoSent": True, + "source": "runner_initial_prompt" + } + }] + } + + # Get BOT_TOKEN for auth + bot_token = os.getenv("BOT_TOKEN", "").strip() + headers = {"Content-Type": "application/json"} + if bot_token: + headers["Authorization"] = f"Bearer {bot_token}" try: - # Get backend URL from environment - backend_url = os.getenv("BACKEND_API_URL", "").rstrip("/") - project_name = os.getenv("PROJECT_NAME", "").strip() or os.getenv("AGENTIC_SESSION_NAMESPACE", "").strip() - - if not backend_url or not project_name: - logger.error("Cannot auto-execute INITIAL_PROMPT: BACKEND_API_URL or PROJECT_NAME not set") - return - - url = f"{backend_url}/projects/{project_name}/agentic-sessions/{session_id}/agui/run" - - payload = { - "threadId": session_id, - "runId": str(uuid.uuid4()), - "messages": [{ - "id": str(uuid.uuid4()), - "role": "user", - "content": prompt, - "metadata": { - "hidden": True, - "autoSent": True, - "source": "runner_auto_execute" - } - }] - } - - # Get BOT_TOKEN for auth - bot_token = os.getenv("BOT_TOKEN", "").strip() - headers = {"Content-Type": "application/json"} - if bot_token: - headers["Authorization"] = f"Bearer {bot_token}" - async with aiohttp.ClientSession() as session: - async with session.post(url, json=payload, headers=headers) as resp: + async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp: if resp.status == 200: result = await resp.json() logger.info(f"INITIAL_PROMPT auto-execution started: {result}") else: error_text = await resp.text() - logger.error(f"INITIAL_PROMPT auto-execution failed: {resp.status} - {error_text}") - + logger.warning(f"INITIAL_PROMPT failed with status {resp.status}: {error_text[:200]}") except Exception as e: - logger.error(f"Failed to auto-execute INITIAL_PROMPT: {e}") + logger.warning(f"INITIAL_PROMPT auto-execution error (backend will retry): {e}") + app = FastAPI( @@ -218,7 +230,8 @@ async def event_generator(): _adapter_initialized = True logger.info("Starting adapter.process_run()...") - # Process the actual run + + # Process the run (creates fresh client each time) async for event in adapter.process_run(run_agent_input): logger.debug(f"Yielding run event: {event.type}") yield encoder.encode(event) @@ -296,7 +309,7 @@ async def change_workflow(request: Request): _adapter_initialized = False adapter._first_run = True - logger.info("Workflow updated, triggering new run with workflow greeting") + logger.info("Workflow updated, adapter will reinitialize on next run") # Trigger a new run to greet user with workflow context # This runs in background via backend POST