From 9cc14d1d778ae473e0ecca804ea6d7dfdfcf756f Mon Sep 17 00:00:00 2001 From: Gage Krumbach Date: Thu, 18 Dec 2025 12:44:42 -0600 Subject: [PATCH 1/3] Add in-memory caching for OOTB workflows to optimize GitHub API usage Implemented a new caching mechanism for out-of-the-box workflows to reduce GitHub API rate limits. The cache stores workflows for 5 minutes and checks for existing cached data before making API calls. Updated the ListOOTBWorkflows function to utilize this cache, improving performance and efficiency. --- components/backend/handlers/sessions.go | 100 +++++++++---- .../runners/claude-code-runner/adapter.py | 24 +--- components/runners/claude-code-runner/main.py | 131 +++++++++++------- 3 files changed, 153 insertions(+), 102 deletions(-) diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go index 0e359f93..b413c966 100644 --- a/components/backend/handlers/sessions.go +++ b/components/backend/handlers/sessions.go @@ -14,6 +14,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "unicode/utf8" @@ -46,6 +47,20 @@ var ( const runnerTokenRefreshedAtAnnotation = "ambient-code.io/token-refreshed-at" +// ootbWorkflowsCache provides in-memory caching for OOTB workflows to avoid GitHub API rate limits. +// The cache stores workflows by repo URL key and expires after ootbCacheTTL. +type ootbWorkflowsCache struct { + mu sync.RWMutex + workflows []OOTBWorkflow + cachedAt time.Time + cacheKey string // repo+branch+path combination +} + +var ( + ootbCache = &ootbWorkflowsCache{} + ootbCacheTTL = 5 * time.Minute // Cache OOTB workflows for 5 minutes +) + // isBinaryContentType checks if a MIME type represents binary content that should be base64 encoded. // This includes images, archives, documents, executables, and other non-text formats. func isBinaryContentType(contentType string) bool { @@ -1718,35 +1733,10 @@ type OOTBWorkflow struct { } // ListOOTBWorkflows returns the list of out-of-the-box workflows dynamically discovered from GitHub -// Attempts to use user's GitHub token for better rate limits, falls back to unauthenticated for public repos +// Uses in-memory caching (5 min TTL) to avoid GitHub API rate limits. +// Attempts to use user's GitHub token for better rate limits when cache miss occurs. // GET /api/workflows/ootb?project= func ListOOTBWorkflows(c *gin.Context) { - // Try to get user's GitHub token (best effort - not required) - // This gives better rate limits (5000/hr vs 60/hr) and supports private repos - // Project is optional - if provided, we'll try to get the user's token - token := "" - project := c.Query("project") // Optional query parameter - if project != "" { - usrID, _ := c.Get("userID") - k8sClt, sessDyn := GetK8sClientsForRequest(c) - if k8sClt == nil || sessDyn == nil { - c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid or missing token"}) - c.Abort() - return - } - if userIDStr, ok := usrID.(string); ok && userIDStr != "" { - if githubToken, err := GetGitHubToken(c.Request.Context(), k8sClt, sessDyn, project, userIDStr); err == nil { - token = githubToken - log.Printf("ListOOTBWorkflows: using user's GitHub token for project %s (better rate limits)", project) - } else { - log.Printf("ListOOTBWorkflows: failed to get GitHub token for project %s: %v", project, err) - } - } - } - if token == "" { - log.Printf("ListOOTBWorkflows: proceeding without GitHub token (public repo, lower rate limits)") - } - // Read OOTB repo configuration from environment ootbRepo := strings.TrimSpace(os.Getenv("OOTB_WORKFLOWS_REPO")) if ootbRepo == "" { @@ -1763,6 +1753,43 @@ func ListOOTBWorkflows(c *gin.Context) { ootbWorkflowsPath = "workflows" } + // Build cache key from repo configuration + cacheKey := fmt.Sprintf("%s|%s|%s", ootbRepo, ootbBranch, ootbWorkflowsPath) + + // Check cache first (read lock) + ootbCache.mu.RLock() + if ootbCache.cacheKey == cacheKey && time.Since(ootbCache.cachedAt) < ootbCacheTTL && len(ootbCache.workflows) > 0 { + workflows := ootbCache.workflows + ootbCache.mu.RUnlock() + log.Printf("ListOOTBWorkflows: returning %d cached workflows (age: %v)", len(workflows), time.Since(ootbCache.cachedAt).Round(time.Second)) + c.JSON(http.StatusOK, gin.H{"workflows": workflows}) + return + } + ootbCache.mu.RUnlock() + + // Cache miss - need to fetch from GitHub + // Try to get user's GitHub token (best effort - not required) + // This gives better rate limits (5000/hr vs 60/hr) and supports private repos + token := "" + project := c.Query("project") // Optional query parameter + if project != "" { + usrID, _ := c.Get("userID") + k8sClt, sessDyn := GetK8sClientsForRequest(c) + if k8sClt != nil && sessDyn != nil { + if userIDStr, ok := usrID.(string); ok && userIDStr != "" { + if githubToken, err := GetGitHubToken(c.Request.Context(), k8sClt, sessDyn, project, userIDStr); err == nil { + token = githubToken + log.Printf("ListOOTBWorkflows: using user's GitHub token for project %s (better rate limits)", project) + } else { + log.Printf("ListOOTBWorkflows: failed to get GitHub token for project %s: %v", project, err) + } + } + } + } + if token == "" { + log.Printf("ListOOTBWorkflows: proceeding without GitHub token (public repo, lower rate limits)") + } + // Parse GitHub URL owner, repoName, err := git.ParseGitHubURL(ootbRepo) if err != nil { @@ -1775,6 +1802,16 @@ func ListOOTBWorkflows(c *gin.Context) { entries, err := fetchGitHubDirectoryListing(c.Request.Context(), owner, repoName, ootbBranch, ootbWorkflowsPath, token) if err != nil { log.Printf("ListOOTBWorkflows: failed to list workflows directory: %v", err) + // On error, try to return stale cache if available + ootbCache.mu.RLock() + if len(ootbCache.workflows) > 0 && ootbCache.cacheKey == cacheKey { + workflows := ootbCache.workflows + ootbCache.mu.RUnlock() + log.Printf("ListOOTBWorkflows: returning stale cached workflows due to GitHub error") + c.JSON(http.StatusOK, gin.H{"workflows": workflows}) + return + } + ootbCache.mu.RUnlock() c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to discover OOTB workflows"}) return } @@ -1822,7 +1859,14 @@ func ListOOTBWorkflows(c *gin.Context) { }) } - log.Printf("ListOOTBWorkflows: discovered %d workflows from %s", len(workflows), ootbRepo) + // Update cache (write lock) + ootbCache.mu.Lock() + ootbCache.workflows = workflows + ootbCache.cachedAt = time.Now() + ootbCache.cacheKey = cacheKey + ootbCache.mu.Unlock() + + log.Printf("ListOOTBWorkflows: discovered %d workflows from %s (cached for %v)", len(workflows), ootbRepo, ootbCacheTTL) c.JSON(http.StatusOK, gin.H{"workflows": workflows}) } diff --git a/components/runners/claude-code-runner/adapter.py b/components/runners/claude-code-runner/adapter.py index 15800ba4..46c305de 100644 --- a/components/runners/claude-code-runner/adapter.py +++ b/components/runners/claude-code-runner/adapter.py @@ -75,10 +75,6 @@ def __init__(self): self._current_tool_id: Optional[str] = None self._current_run_id: Optional[str] = None self._current_thread_id: Optional[str] = None - - # Active Claude SDK client for interrupt support - self._active_client: Optional[Any] = None - self._active_client_ctx: Optional[Any] = None async def initialize(self, context: RunnerContext): """Initialize the adapter with context.""" @@ -489,10 +485,6 @@ def create_sdk_client(opts, disable_continue=False): else: raise - # Store active client for interrupt support - self._active_client = client - self._active_client_ctx = client_ctx - try: if not self._first_run: yield RawEvent( @@ -715,18 +707,12 @@ def create_sdk_client(opts, disable_continue=False): finally: await client_ctx.__aexit__(None, None, None) - # Clear active client reference - self._active_client = None - self._active_client_ctx = None - + # Finalize observability await obs.finalize() except Exception as e: logger.error(f"Failed to run Claude Code SDK: {e}") - # Clear active client on error - self._active_client = None - self._active_client_ctx = None if 'obs' in locals(): await obs.cleanup_on_error(e) raise @@ -734,14 +720,7 @@ def create_sdk_client(opts, disable_continue=False): async def interrupt(self) -> None: """ Interrupt the active Claude SDK execution. - - Sends interrupt signal to stop Claude mid-execution. - See: https://platform.claude.com/docs/en/agent-sdk/python#methods """ - if not self._active_client: - logger.warning("Interrupt requested but no active client") - return - try: logger.info("Sending interrupt signal to Claude SDK client...") await self._active_client.interrupt() @@ -749,6 +728,7 @@ async def interrupt(self) -> None: except Exception as e: logger.error(f"Failed to interrupt Claude SDK: {e}") + def _setup_workflow_paths(self, active_workflow_url: str, repos_cfg: list) -> tuple[str, list, str]: """Setup paths for workflow mode.""" add_dirs = [] diff --git a/components/runners/claude-code-runner/main.py b/components/runners/claude-code-runner/main.py index 631c303a..5c11699c 100644 --- a/components/runners/claude-code-runner/main.py +++ b/components/runners/claude-code-runner/main.py @@ -95,14 +95,19 @@ async def lifespan(app: FastAPI): adapter = ClaudeCodeAdapter() adapter.context = context - # Check for INITIAL_PROMPT and auto-execute on startup - # Runner knows when it's ready, so this is more reliable than backend timing + # Check for INITIAL_PROMPT and auto-execute (only if this is first run) + # Skip if conversation history already exists (session restart) initial_prompt = os.getenv("INITIAL_PROMPT", "").strip() if initial_prompt: - logger.info(f"INITIAL_PROMPT detected ({len(initial_prompt)} chars), will auto-execute after server starts") - # Schedule auto-execution after server is ready - import asyncio - asyncio.create_task(auto_execute_initial_prompt(initial_prompt, session_id)) + # Check if there's already conversation history + from pathlib import Path + history_marker = Path(workspace_path) / ".claude" / "state" + + if history_marker.exists(): + logger.info(f"INITIAL_PROMPT detected but conversation history exists - skipping auto-execution (session restart)") + else: + logger.info(f"INITIAL_PROMPT detected ({len(initial_prompt)} chars), will auto-execute") + asyncio.create_task(auto_execute_initial_prompt(initial_prompt, session_id)) logger.info(f"AG-UI server ready for session {session_id}") @@ -113,58 +118,80 @@ async def lifespan(app: FastAPI): async def auto_execute_initial_prompt(prompt: str, session_id: str): - """Auto-execute INITIAL_PROMPT by POSTing to backend after server is ready.""" + """Auto-execute INITIAL_PROMPT by POSTing to backend (via Service). + + We POST to the backend so events flow through the proxy and are visible in the UI. + Retries handle Service DNS propagation delays naturally. + """ import uuid import aiohttp - # Wait for FastAPI server to be fully ready - await asyncio.sleep(2) - - logger.info(f"Auto-executing INITIAL_PROMPT via backend POST...") + logger.info("Auto-executing INITIAL_PROMPT via backend POST (will retry until Service DNS is ready)...") + + # Get backend URL from environment + backend_url = os.getenv("BACKEND_API_URL", "").rstrip("/") + project_name = os.getenv("PROJECT_NAME", "").strip() or os.getenv("AGENTIC_SESSION_NAMESPACE", "").strip() + + if not backend_url or not project_name: + logger.error("Cannot auto-execute INITIAL_PROMPT: BACKEND_API_URL or PROJECT_NAME not set") + logger.error(f" BACKEND_API_URL={os.getenv('BACKEND_API_URL', '(not set)')}") + logger.error(f" PROJECT_NAME={os.getenv('PROJECT_NAME', '(not set)')}") + logger.error(f" AGENTIC_SESSION_NAMESPACE={os.getenv('AGENTIC_SESSION_NAMESPACE', '(not set)')}") + return + + # BACKEND_API_URL already includes /api suffix from operator + url = f"{backend_url}/projects/{project_name}/agentic-sessions/{session_id}/agui/run" + logger.info(f"Auto-execution URL: {url}") + + payload = { + "threadId": session_id, + "runId": str(uuid.uuid4()), + "messages": [{ + "id": str(uuid.uuid4()), + "role": "user", + "content": prompt, + "metadata": { + "hidden": True, + "autoSent": True, + "source": "runner_auto_execute" + } + }] + } - try: - # Get backend URL from environment - backend_url = os.getenv("BACKEND_API_URL", "").rstrip("/") - project_name = os.getenv("PROJECT_NAME", "").strip() or os.getenv("AGENTIC_SESSION_NAMESPACE", "").strip() - - if not backend_url or not project_name: - logger.error("Cannot auto-execute INITIAL_PROMPT: BACKEND_API_URL or PROJECT_NAME not set") - return - - url = f"{backend_url}/projects/{project_name}/agentic-sessions/{session_id}/agui/run" - - payload = { - "threadId": session_id, - "runId": str(uuid.uuid4()), - "messages": [{ - "id": str(uuid.uuid4()), - "role": "user", - "content": prompt, - "metadata": { - "hidden": True, - "autoSent": True, - "source": "runner_auto_execute" - } - }] - } + # Get BOT_TOKEN for auth + bot_token = os.getenv("BOT_TOKEN", "").strip() + headers = {"Content-Type": "application/json"} + if bot_token: + headers["Authorization"] = f"Bearer {bot_token}" + + # Retry aggressively - Service DNS can take 10-20 seconds to propagate + # First retry happens immediately, then we back off + max_retries = 10 + + for attempt in range(max_retries): + # Exponential backoff: 0, 2, 3, 4, 5, 5, 5... seconds + if attempt > 0: + delay = min(2 + attempt, 5) + await asyncio.sleep(delay) - # Get BOT_TOKEN for auth - bot_token = os.getenv("BOT_TOKEN", "").strip() - headers = {"Content-Type": "application/json"} - if bot_token: - headers["Authorization"] = f"Bearer {bot_token}" + try: + async with aiohttp.ClientSession() as session: + async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp: + if resp.status == 200: + result = await resp.json() + logger.info(f"INITIAL_PROMPT auto-execution started: {result}") + return + else: + error_text = await resp.text() + logger.warning(f"INITIAL_PROMPT attempt {attempt + 1}/{max_retries} failed: {resp.status} - {error_text[:200]}") - async with aiohttp.ClientSession() as session: - async with session.post(url, json=payload, headers=headers) as resp: - if resp.status == 200: - result = await resp.json() - logger.info(f"INITIAL_PROMPT auto-execution started: {result}") - else: - error_text = await resp.text() - logger.error(f"INITIAL_PROMPT auto-execution failed: {resp.status} - {error_text}") + except aiohttp.ClientConnectorError as e: + # Connection error - likely Service DNS not ready yet + logger.info(f"INITIAL_PROMPT attempt {attempt + 1}/{max_retries}: Service not ready yet, will retry...") + except Exception as e: + logger.warning(f"INITIAL_PROMPT attempt {attempt + 1}/{max_retries} error: {e}") - except Exception as e: - logger.error(f"Failed to auto-execute INITIAL_PROMPT: {e}") + logger.error(f"Failed to auto-execute INITIAL_PROMPT after {max_retries} attempts (Service DNS may not have propagated)") app = FastAPI( @@ -296,7 +323,7 @@ async def change_workflow(request: Request): _adapter_initialized = False adapter._first_run = True - logger.info("Workflow updated, triggering new run with workflow greeting") + logger.info("Workflow updated, adapter will reinitialize on next run") # Trigger a new run to greet user with workflow context # This runs in background via backend POST From 422497ade150bb3e2b22342a32f3cfe18c79cc17 Mon Sep 17 00:00:00 2001 From: Gage Krumbach Date: Thu, 18 Dec 2025 14:48:53 -0600 Subject: [PATCH 2/3] Implement retry logic for AGUI proxy requests and enhance ClaudeCodeAdapter with persistent client support The AGUI proxy now includes a retry mechanism for background requests to the runner, allowing for up to 15 attempts with exponential backoff if the runner is not ready. Additionally, the ClaudeCodeAdapter has been updated to support a persistent client for improved performance and to handle Google OAuth credentials more effectively, ensuring they are refreshed when available. This change enhances the overall reliability and efficiency of the system. --- components/backend/websocket/agui_proxy.go | 66 ++++-- .../runners/claude-code-runner/adapter.py | 217 +++++++++++++++--- components/runners/claude-code-runner/main.py | 142 +++++++----- 3 files changed, 321 insertions(+), 104 deletions(-) diff --git a/components/backend/websocket/agui_proxy.go b/components/backend/websocket/agui_proxy.go index 6a60ba79..1122bdc8 100644 --- a/components/backend/websocket/agui_proxy.go +++ b/components/backend/websocket/agui_proxy.go @@ -147,26 +147,58 @@ func HandleAGUIRunProxy(c *gin.Context) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Hour) defer cancel() - proxyReq, err := http.NewRequestWithContext(ctx, "POST", runnerURL, bytes.NewReader(bodyBytes)) - if err != nil { - log.Printf("AGUI Proxy: Failed to create request in background: %v", err) - updateRunStatus(runID, "error") - return - } - - // Forward headers - proxyReq.Header.Set("Content-Type", "application/json") - proxyReq.Header.Set("Accept", "text/event-stream") - - // Execute request + // Execute request with retries (runner may not be ready immediately after startup) client := &http.Client{ Timeout: 0, // No timeout, context handles it } - resp, err := client.Do(proxyReq) - if err != nil { - log.Printf("AGUI Proxy: Background request failed: %v", err) - updateRunStatus(runID, "error") - return + + var resp *http.Response + maxRetries := 15 + retryDelay := 500 * time.Millisecond + + for attempt := 1; attempt <= maxRetries; attempt++ { + // Create fresh request for each attempt (body reader needs reset) + proxyReq, err := http.NewRequestWithContext(ctx, "POST", runnerURL, bytes.NewReader(bodyBytes)) + if err != nil { + log.Printf("AGUI Proxy: Failed to create request in background: %v", err) + updateRunStatus(runID, "error") + return + } + + // Forward headers + proxyReq.Header.Set("Content-Type", "application/json") + proxyReq.Header.Set("Accept", "text/event-stream") + + resp, err = client.Do(proxyReq) + if err == nil { + break // Success! + } + + // Check if it's a connection refused error (runner not ready yet) + errStr := err.Error() + isConnectionRefused := strings.Contains(errStr, "connection refused") || + strings.Contains(errStr, "no such host") || + strings.Contains(errStr, "dial tcp") + + if !isConnectionRefused || attempt == maxRetries { + log.Printf("AGUI Proxy: Background request failed after %d attempts: %v", attempt, err) + updateRunStatus(runID, "error") + return + } + + log.Printf("AGUI Proxy: Runner not ready (attempt %d/%d), retrying in %v...", attempt, maxRetries, retryDelay) + + select { + case <-ctx.Done(): + log.Printf("AGUI Proxy: Context cancelled during retry for run %s", runID) + return + case <-time.After(retryDelay): + // Exponential backoff with cap at 5 seconds + retryDelay = time.Duration(float64(retryDelay) * 1.5) + if retryDelay > 5*time.Second { + retryDelay = 5 * time.Second + } + } } defer resp.Body.Close() diff --git a/components/runners/claude-code-runner/adapter.py b/components/runners/claude-code-runner/adapter.py index 46c305de..768cfbf7 100644 --- a/components/runners/claude-code-runner/adapter.py +++ b/components/runners/claude-code-runner/adapter.py @@ -75,6 +75,9 @@ def __init__(self): self._current_tool_id: Optional[str] = None self._current_run_id: Optional[str] = None self._current_thread_id: Optional[str] = None + + # Active client reference for interrupt support + self._active_client: Optional[Any] = None async def initialize(self, context: RunnerContext): """Initialize the adapter with context.""" @@ -104,7 +107,7 @@ def _timestamp(self) -> str: """Return current UTC timestamp in ISO format.""" return datetime.now(timezone.utc).isoformat() - async def process_run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEvent]: + async def process_run(self, input_data: RunAgentInput, app_state: Optional[Any] = None) -> AsyncIterator[BaseEvent]: """ Process a run and yield AG-UI events. @@ -112,6 +115,7 @@ async def process_run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEven Args: input_data: RunAgentInput with thread_id, run_id, messages, tools + app_state: Optional FastAPI app.state for persistent client storage/reuse Yields: AG-UI events (RunStartedEvent, TextMessageContentEvent, etc.) @@ -122,6 +126,10 @@ async def process_run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEven self._current_thread_id = thread_id self._current_run_id = run_id + # Check for newly available Google OAuth credentials (user may have authenticated mid-session) + # This picks up credentials after K8s syncs the mounted secret (~60s after OAuth completes) + await self.refresh_google_credentials() + try: # Emit RUN_STARTED yield RunStartedEvent( @@ -206,7 +214,7 @@ async def process_run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEven # Run Claude SDK and yield events logger.info(f"Starting Claude SDK with prompt: '{user_message[:50]}...'") - async for event in self._run_claude_agent_sdk(user_message, thread_id, run_id): + async for event in self._run_claude_agent_sdk(user_message, thread_id, run_id, app_state=app_state): yield event logger.info(f"Claude SDK processing completed for run {run_id}") @@ -272,10 +280,23 @@ def _extract_user_message(self, input_data: RunAgentInput) -> str: return "" async def _run_claude_agent_sdk( - self, prompt: str, thread_id: str, run_id: str + self, prompt: str, thread_id: str, run_id: str, app_state: Optional[Any] = None ) -> AsyncIterator[BaseEvent]: - """Execute the Claude Code SDK with the given prompt and yield AG-UI events.""" - logger.info(f"_run_claude_agent_sdk called with prompt length={len(prompt)}") + """Execute the Claude Code SDK with the given prompt and yield AG-UI events. + + Args: + prompt: The user prompt to send to Claude + thread_id: AG-UI thread identifier + run_id: AG-UI run identifier + app_state: Optional FastAPI app.state for persistent client storage/reuse + """ + # Check if we have a persistent client in app_state + has_persistent_client = ( + app_state is not None and + hasattr(app_state, 'claude_client') and + app_state.claude_client is not None + ) + logger.info(f"_run_claude_agent_sdk called with prompt length={len(prompt)}, has_persistent_client={has_persistent_client}") try: # Check for authentication method logger.info("Checking authentication configuration...") @@ -463,29 +484,90 @@ def create_sdk_client(opts, disable_continue=False): opts.continue_conversation = False return ClaudeSDKClient(options=opts) - # Create SDK client with retry logic - try: - logger.info("Creating ClaudeSDKClient context manager...") - client_ctx = create_sdk_client(options) - logger.info("Entering ClaudeSDKClient context (initializing subprocess)...") - client = await client_ctx.__aenter__() - logger.info("ClaudeSDKClient initialized successfully!") - except Exception as resume_error: - error_str = str(resume_error).lower() - if "no conversation found" in error_str or "session" in error_str: - logger.warning(f"Conversation continuation failed: {resume_error}") - yield RawEvent( - type=EventType.RAW, - thread_id=thread_id, - run_id=run_id, - event={"type": "system_log", "message": "⚠️ Could not continue conversation, starting fresh..."} - ) - client_ctx = create_sdk_client(options, disable_continue=True) - client = await client_ctx.__aenter__() - else: - raise + # Check if we can reuse persistent client from app_state + client_is_alive = False + if has_persistent_client: + # Check if the underlying subprocess is still alive + client = app_state.claude_client + try: + # The SDK client has an internal transport with a subprocess + transport = getattr(client, '_transport', None) + if transport: + proc = getattr(transport, '_process', None) or getattr(transport, 'process', None) + if proc and hasattr(proc, 'returncode'): + client_is_alive = proc.returncode is None + if not client_is_alive: + logger.warning(f"Persistent client subprocess exited (code={proc.returncode}), will create new one") + else: + # Can't check, assume alive + client_is_alive = True + else: + # No transport, assume alive + client_is_alive = True + except Exception as e: + logger.warning(f"Could not check client subprocess status: {e}") + client_is_alive = False + + if has_persistent_client and client_is_alive: + # Reusing persistent client from app.state + logger.info("♻️ Reusing persistent Claude SDK client (conversation continuity)") + client = app_state.claude_client + client_ctx = None # Not managing lifecycle - client is persistent + yield RawEvent( + type=EventType.RAW, + thread_id=thread_id, + run_id=run_id, + event={"type": "system_log", "message": "♻️ Reusing persistent client (instant startup!)"} + ) + else: + # Clear stale persistent client if subprocess died + if has_persistent_client and not client_is_alive: + logger.info("Clearing stale persistent client") + # Try to disconnect cleanly + try: + if app_state.claude_client: + await app_state.claude_client.disconnect() + except Exception as e: + logger.warning(f"Error disconnecting stale client: {e}") + app_state.claude_client = None + app_state.claude_client_ctx = None + + # Create new client with full options using explicit connect() pattern + # (matches SDK docs example for continuous conversation) + logger.info("Creating new ClaudeSDKClient with full options...") + + try: + logger.info("Creating ClaudeSDKClient...") + client = create_sdk_client(options) + logger.info("Connecting ClaudeSDKClient (initializing subprocess)...") + await client.connect() + logger.info("ClaudeSDKClient connected successfully!") + client_ctx = None # Using explicit connect/disconnect, not context manager + except Exception as resume_error: + error_str = str(resume_error).lower() + if "no conversation found" in error_str or "session" in error_str: + logger.warning(f"Conversation continuation failed: {resume_error}") + yield RawEvent( + type=EventType.RAW, + thread_id=thread_id, + run_id=run_id, + event={"type": "system_log", "message": "⚠️ Could not continue conversation, starting fresh..."} + ) + client = create_sdk_client(options, disable_continue=True) + await client.connect() + else: + raise + + # Store in app_state for reuse in subsequent requests + if app_state is not None: + logger.info("✅ Storing persistent client in app_state for future requests") + app_state.claude_client = client + app_state.claude_client_ctx = None # Using explicit connect/disconnect try: + # Store client reference for interrupt support + self._active_client = client + if not self._first_run: yield RawEvent( type=EventType.RAW, @@ -706,7 +788,15 @@ def create_sdk_client(opts, disable_continue=False): self._first_run = False finally: - await client_ctx.__aexit__(None, None, None) + # Clear active client reference (interrupt no longer valid for this run) + self._active_client = None + + # Only destroy client if we created it (not reusing persistent one) + if client_ctx is not None: + logger.info("Cleaning up newly created client for this request") + await client_ctx.__aexit__(None, None, None) + else: + logger.info("Keeping persistent client alive for next request") # Finalize observability await obs.finalize() @@ -721,6 +811,10 @@ async def interrupt(self) -> None: """ Interrupt the active Claude SDK execution. """ + if self._active_client is None: + logger.warning("Interrupt requested but no active client") + return + try: logger.info("Sending interrupt signal to Claude SDK client...") await self._active_client.interrupt() @@ -1441,12 +1535,35 @@ def _build_workspace_context_prompt(self, repos_cfg, workflow_name, artifacts_pa async def _setup_google_credentials(self): - """Copy Google OAuth credentials from mounted Secret to writable workspace location.""" - # Check if Google OAuth secret is mounted + """Copy Google OAuth credentials from mounted Secret to writable workspace location. + + The secret is always mounted (as placeholder if user hasn't authenticated). + This method checks if credentials.json exists and has content. + Call refresh_google_credentials() periodically to pick up new credentials after OAuth. + """ + await self._try_copy_google_credentials() + + async def _try_copy_google_credentials(self) -> bool: + """Attempt to copy Google credentials from mounted secret. + + Returns: + True if credentials were successfully copied, False otherwise. + """ secret_path = Path("/app/.google_workspace_mcp/credentials/credentials.json") + + # Check if secret file exists if not secret_path.exists(): - logging.debug("Google OAuth credentials not found at %s, skipping setup", secret_path) - return + logging.debug("Google OAuth credentials not found at %s (placeholder secret or not mounted)", secret_path) + return False + + # Check if file has content (not empty placeholder) + try: + if secret_path.stat().st_size == 0: + logging.debug("Google OAuth credentials file is empty (user hasn't authenticated yet)") + return False + except OSError as e: + logging.debug("Could not stat Google OAuth credentials file: %s", e) + return False # Create writable credentials directory in workspace workspace_creds_dir = Path("/workspace/.google_workspace_mcp/credentials") @@ -1459,5 +1576,41 @@ async def _setup_google_credentials(self): # Make it writable so workspace-mcp can update tokens dest_path.chmod(0o644) logging.info("✓ Copied Google OAuth credentials from Secret to writable workspace at %s", dest_path) + return True except Exception as e: - logging.error("Failed to copy Google OAuth credentials: %s", e) \ No newline at end of file + logging.error("Failed to copy Google OAuth credentials: %s", e) + return False + + async def refresh_google_credentials(self) -> bool: + """Check for and copy new Google OAuth credentials. + + Call this method periodically (e.g., before processing a message) to detect + when a user completes the OAuth flow and credentials become available. + + Kubernetes automatically updates the mounted secret volume when the secret + changes (typically within ~60 seconds), so this will pick up new credentials + without requiring a pod restart. + + Returns: + True if new credentials were found and copied, False otherwise. + """ + dest_path = Path("/workspace/.google_workspace_mcp/credentials/credentials.json") + + # If we already have credentials in workspace, check if source is newer + if dest_path.exists(): + secret_path = Path("/app/.google_workspace_mcp/credentials/credentials.json") + if secret_path.exists(): + try: + # Compare modification times - secret mount updates when K8s syncs + if secret_path.stat().st_mtime > dest_path.stat().st_mtime: + logging.info("Detected updated Google OAuth credentials, refreshing...") + return await self._try_copy_google_credentials() + except OSError: + pass + return False + + # No credentials yet, try to copy + if await self._try_copy_google_credentials(): + logging.info("✓ Google OAuth credentials now available (user completed authentication)") + return True + return False \ No newline at end of file diff --git a/components/runners/claude-code-runner/main.py b/components/runners/claude-code-runner/main.py index 5c11699c..9ed76889 100644 --- a/components/runners/claude-code-runner/main.py +++ b/components/runners/claude-code-runner/main.py @@ -80,6 +80,7 @@ async def lifespan(app: FastAPI): # Import adapter here to avoid circular imports from adapter import ClaudeCodeAdapter + from pathlib import Path # Initialize context from environment session_id = os.getenv("SESSION_ID", "unknown") @@ -95,38 +96,55 @@ async def lifespan(app: FastAPI): adapter = ClaudeCodeAdapter() adapter.context = context + # Don't create persistent client here - adapter will create it on first request + # with proper options (MCP servers, system prompt, model, etc.) + # The client will then be stored in app.state for reuse + app.state.claude_client = None + app.state.claude_client_ctx = None + app.state.claude_lock = asyncio.Lock() # Serialize access to client + + logger.info("Client will be created on first request with full options") + + # Check if this is a restart (conversation history exists) + history_marker = Path(workspace_path) / ".claude" / "state" + # Check for INITIAL_PROMPT and auto-execute (only if this is first run) - # Skip if conversation history already exists (session restart) initial_prompt = os.getenv("INITIAL_PROMPT", "").strip() - if initial_prompt: - # Check if there's already conversation history - from pathlib import Path - history_marker = Path(workspace_path) / ".claude" / "state" - - if history_marker.exists(): - logger.info(f"INITIAL_PROMPT detected but conversation history exists - skipping auto-execution (session restart)") - else: - logger.info(f"INITIAL_PROMPT detected ({len(initial_prompt)} chars), will auto-execute") - asyncio.create_task(auto_execute_initial_prompt(initial_prompt, session_id)) + if initial_prompt and not history_marker.exists(): + logger.info(f"INITIAL_PROMPT detected ({len(initial_prompt)} chars), will auto-execute after 3s delay") + asyncio.create_task(auto_execute_initial_prompt(initial_prompt, session_id)) + elif initial_prompt: + logger.info(f"INITIAL_PROMPT detected but conversation history exists - skipping auto-execution (session restart)") logger.info(f"AG-UI server ready for session {session_id}") yield - # Cleanup - logger.info("Shutting down AG-UI server") + # Cleanup - disconnect the client if one was created + logger.info("Shutting down AG-UI server...") + if app.state.claude_client is not None: + logger.info("Disconnecting persistent Claude SDK client...") + try: + await app.state.claude_client.disconnect() + except Exception as e: + logger.warning(f"Error disconnecting client: {e}") + logger.info("Claude SDK client disconnected") async def auto_execute_initial_prompt(prompt: str, session_id: str): - """Auto-execute INITIAL_PROMPT by POSTing to backend (via Service). + """Auto-execute INITIAL_PROMPT by POSTing to backend after short delay. - We POST to the backend so events flow through the proxy and are visible in the UI. - Retries handle Service DNS propagation delays naturally. + The 3-second delay gives the runner time to fully start. Backend has retry + logic to handle if Service DNS isn't ready yet. """ import uuid import aiohttp - logger.info("Auto-executing INITIAL_PROMPT via backend POST (will retry until Service DNS is ready)...") + # Give runner time to fully start before backend tries to reach us + logger.info("Waiting 3s before auto-executing INITIAL_PROMPT (allow Service DNS to propagate)...") + await asyncio.sleep(3) + + logger.info("Auto-executing INITIAL_PROMPT via backend POST...") # Get backend URL from environment backend_url = os.getenv("BACKEND_API_URL", "").rstrip("/") @@ -134,9 +152,6 @@ async def auto_execute_initial_prompt(prompt: str, session_id: str): if not backend_url or not project_name: logger.error("Cannot auto-execute INITIAL_PROMPT: BACKEND_API_URL or PROJECT_NAME not set") - logger.error(f" BACKEND_API_URL={os.getenv('BACKEND_API_URL', '(not set)')}") - logger.error(f" PROJECT_NAME={os.getenv('PROJECT_NAME', '(not set)')}") - logger.error(f" AGENTIC_SESSION_NAMESPACE={os.getenv('AGENTIC_SESSION_NAMESPACE', '(not set)')}") return # BACKEND_API_URL already includes /api suffix from operator @@ -153,7 +168,7 @@ async def auto_execute_initial_prompt(prompt: str, session_id: str): "metadata": { "hidden": True, "autoSent": True, - "source": "runner_auto_execute" + "source": "runner_initial_prompt" } }] } @@ -164,34 +179,36 @@ async def auto_execute_initial_prompt(prompt: str, session_id: str): if bot_token: headers["Authorization"] = f"Bearer {bot_token}" - # Retry aggressively - Service DNS can take 10-20 seconds to propagate - # First retry happens immediately, then we back off - max_retries = 10 - - for attempt in range(max_retries): - # Exponential backoff: 0, 2, 3, 4, 5, 5, 5... seconds - if attempt > 0: - delay = min(2 + attempt, 5) - await asyncio.sleep(delay) - - try: - async with aiohttp.ClientSession() as session: - async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp: - if resp.status == 200: - result = await resp.json() - logger.info(f"INITIAL_PROMPT auto-execution started: {result}") - return - else: - error_text = await resp.text() - logger.warning(f"INITIAL_PROMPT attempt {attempt + 1}/{max_retries} failed: {resp.status} - {error_text[:200]}") - - except aiohttp.ClientConnectorError as e: - # Connection error - likely Service DNS not ready yet - logger.info(f"INITIAL_PROMPT attempt {attempt + 1}/{max_retries}: Service not ready yet, will retry...") - except Exception as e: - logger.warning(f"INITIAL_PROMPT attempt {attempt + 1}/{max_retries} error: {e}") + try: + async with aiohttp.ClientSession() as session: + async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp: + if resp.status == 200: + result = await resp.json() + logger.info(f"INITIAL_PROMPT auto-execution started: {result}") + else: + error_text = await resp.text() + logger.warning(f"INITIAL_PROMPT failed with status {resp.status}: {error_text[:200]}") + except Exception as e: + logger.warning(f"INITIAL_PROMPT auto-execution error (backend will retry): {e}") + + +async def invalidate_persistent_client(app): + """Destroy persistent client so new one is created with new options. - logger.error(f"Failed to auto-execute INITIAL_PROMPT after {max_retries} attempts (Service DNS may not have propagated)") + Call this when workflow or repos change and client needs new configuration. + """ + async with app.state.claude_lock: + if app.state.claude_client is not None: + logger.info("🔄 Invalidating persistent client (workflow/repo change)...") + try: + await app.state.claude_client.disconnect() + except Exception as e: + logger.warning(f"Error disconnecting old client: {e}") + app.state.claude_client = None + app.state.claude_client_ctx = None + logger.info("✅ Persistent client invalidated - will recreate on next request") + else: + logger.info("No persistent client to invalidate") app = FastAPI( @@ -245,10 +262,16 @@ async def event_generator(): _adapter_initialized = True logger.info("Starting adapter.process_run()...") - # Process the actual run - async for event in adapter.process_run(run_agent_input): - logger.debug(f"Yielding run event: {event.type}") - yield encoder.encode(event) + + # Use lock to serialize access to the persistent client + async with request.app.state.claude_lock: + # Pass app.state so adapter can create/reuse persistent client + async for event in adapter.process_run( + run_agent_input, + app_state=request.app.state + ): + logger.debug(f"Yielding run event: {event.type}") + yield encoder.encode(event) logger.info("adapter.process_run() completed") except Exception as e: logger.error(f"Error in event generator: {e}") @@ -323,7 +346,10 @@ async def change_workflow(request: Request): _adapter_initialized = False adapter._first_run = True - logger.info("Workflow updated, adapter will reinitialize on next run") + # Destroy persistent client so new one is created with new options + await invalidate_persistent_client(request.app) + + logger.info("Workflow updated, persistent client invalidated - will recreate on next run") # Trigger a new run to greet user with workflow context # This runs in background via backend POST @@ -431,7 +457,10 @@ async def add_repo(request: Request): _adapter_initialized = False adapter._first_run = True - logger.info(f"Repo added, adapter will reinitialize on next run") + # Destroy persistent client so new one is created with new repo config + await invalidate_persistent_client(request.app) + + logger.info(f"Repo added, persistent client invalidated - will recreate on next run") return {"message": "Repository added"} @@ -468,7 +497,10 @@ async def remove_repo(request: Request): _adapter_initialized = False adapter._first_run = True - logger.info(f"Repo removed, adapter will reinitialize on next run") + # Destroy persistent client so new one is created with new repo config + await invalidate_persistent_client(request.app) + + logger.info(f"Repo removed, persistent client invalidated - will recreate on next run") return {"message": "Repository removed"} From 20ef2ff5c7b5a99e7de0b3f73be2e37953c21c42 Mon Sep 17 00:00:00 2001 From: Gage Krumbach Date: Thu, 18 Dec 2025 15:38:13 -0600 Subject: [PATCH 3/3] Refactor ClaudeCodeAdapter to eliminate persistent client support Updated the ClaudeCodeAdapter to create a fresh client for each run, simplifying the client management process. Removed the app_state dependency for client persistence, ensuring that each request operates independently. Adjusted related logging and cleanup procedures to reflect this change, enhancing reliability and clarity in client handling. --- .../runners/claude-code-runner/adapter.py | 127 +++++------------- components/runners/claude-code-runner/main.py | 64 ++------- 2 files changed, 41 insertions(+), 150 deletions(-) diff --git a/components/runners/claude-code-runner/adapter.py b/components/runners/claude-code-runner/adapter.py index 768cfbf7..419e493d 100644 --- a/components/runners/claude-code-runner/adapter.py +++ b/components/runners/claude-code-runner/adapter.py @@ -107,7 +107,7 @@ def _timestamp(self) -> str: """Return current UTC timestamp in ISO format.""" return datetime.now(timezone.utc).isoformat() - async def process_run(self, input_data: RunAgentInput, app_state: Optional[Any] = None) -> AsyncIterator[BaseEvent]: + async def process_run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEvent]: """ Process a run and yield AG-UI events. @@ -214,7 +214,7 @@ async def process_run(self, input_data: RunAgentInput, app_state: Optional[Any] # Run Claude SDK and yield events logger.info(f"Starting Claude SDK with prompt: '{user_message[:50]}...'") - async for event in self._run_claude_agent_sdk(user_message, thread_id, run_id, app_state=app_state): + async for event in self._run_claude_agent_sdk(user_message, thread_id, run_id): yield event logger.info(f"Claude SDK processing completed for run {run_id}") @@ -280,23 +280,18 @@ def _extract_user_message(self, input_data: RunAgentInput) -> str: return "" async def _run_claude_agent_sdk( - self, prompt: str, thread_id: str, run_id: str, app_state: Optional[Any] = None + self, prompt: str, thread_id: str, run_id: str ) -> AsyncIterator[BaseEvent]: """Execute the Claude Code SDK with the given prompt and yield AG-UI events. + Creates a fresh client for each run - simpler and more reliable than client reuse. + Args: prompt: The user prompt to send to Claude thread_id: AG-UI thread identifier run_id: AG-UI run identifier - app_state: Optional FastAPI app.state for persistent client storage/reuse """ - # Check if we have a persistent client in app_state - has_persistent_client = ( - app_state is not None and - hasattr(app_state, 'claude_client') and - app_state.claude_client is not None - ) - logger.info(f"_run_claude_agent_sdk called with prompt length={len(prompt)}, has_persistent_client={has_persistent_client}") + logger.info(f"_run_claude_agent_sdk called with prompt length={len(prompt)}, will create fresh client") try: # Check for authentication method logger.info("Checking authentication configuration...") @@ -484,85 +479,29 @@ def create_sdk_client(opts, disable_continue=False): opts.continue_conversation = False return ClaudeSDKClient(options=opts) - # Check if we can reuse persistent client from app_state - client_is_alive = False - if has_persistent_client: - # Check if the underlying subprocess is still alive - client = app_state.claude_client - try: - # The SDK client has an internal transport with a subprocess - transport = getattr(client, '_transport', None) - if transport: - proc = getattr(transport, '_process', None) or getattr(transport, 'process', None) - if proc and hasattr(proc, 'returncode'): - client_is_alive = proc.returncode is None - if not client_is_alive: - logger.warning(f"Persistent client subprocess exited (code={proc.returncode}), will create new one") - else: - # Can't check, assume alive - client_is_alive = True - else: - # No transport, assume alive - client_is_alive = True - except Exception as e: - logger.warning(f"Could not check client subprocess status: {e}") - client_is_alive = False - - if has_persistent_client and client_is_alive: - # Reusing persistent client from app.state - logger.info("♻️ Reusing persistent Claude SDK client (conversation continuity)") - client = app_state.claude_client - client_ctx = None # Not managing lifecycle - client is persistent - yield RawEvent( - type=EventType.RAW, - thread_id=thread_id, - run_id=run_id, - event={"type": "system_log", "message": "♻️ Reusing persistent client (instant startup!)"} - ) - else: - # Clear stale persistent client if subprocess died - if has_persistent_client and not client_is_alive: - logger.info("Clearing stale persistent client") - # Try to disconnect cleanly - try: - if app_state.claude_client: - await app_state.claude_client.disconnect() - except Exception as e: - logger.warning(f"Error disconnecting stale client: {e}") - app_state.claude_client = None - app_state.claude_client_ctx = None - - # Create new client with full options using explicit connect() pattern - # (matches SDK docs example for continuous conversation) - logger.info("Creating new ClaudeSDKClient with full options...") - - try: - logger.info("Creating ClaudeSDKClient...") - client = create_sdk_client(options) - logger.info("Connecting ClaudeSDKClient (initializing subprocess)...") + # Always create a fresh client for each run (simple and reliable) + logger.info("Creating new ClaudeSDKClient for this run...") + + try: + logger.info("Creating ClaudeSDKClient...") + client = create_sdk_client(options) + logger.info("Connecting ClaudeSDKClient (initializing subprocess)...") + await client.connect() + logger.info("ClaudeSDKClient connected successfully!") + except Exception as resume_error: + error_str = str(resume_error).lower() + if "no conversation found" in error_str or "session" in error_str: + logger.warning(f"Conversation continuation failed: {resume_error}") + yield RawEvent( + type=EventType.RAW, + thread_id=thread_id, + run_id=run_id, + event={"type": "system_log", "message": "⚠️ Could not continue conversation, starting fresh..."} + ) + client = create_sdk_client(options, disable_continue=True) await client.connect() - logger.info("ClaudeSDKClient connected successfully!") - client_ctx = None # Using explicit connect/disconnect, not context manager - except Exception as resume_error: - error_str = str(resume_error).lower() - if "no conversation found" in error_str or "session" in error_str: - logger.warning(f"Conversation continuation failed: {resume_error}") - yield RawEvent( - type=EventType.RAW, - thread_id=thread_id, - run_id=run_id, - event={"type": "system_log", "message": "⚠️ Could not continue conversation, starting fresh..."} - ) - client = create_sdk_client(options, disable_continue=True) - await client.connect() - else: - raise - - # Store in app_state for reuse in subsequent requests - if app_state is not None: - logger.info("✅ Storing persistent client in app_state for future requests") - app_state.claude_client = client - app_state.claude_client_ctx = None # Using explicit connect/disconnect + else: + raise try: # Store client reference for interrupt support @@ -791,12 +730,10 @@ def create_sdk_client(opts, disable_continue=False): # Clear active client reference (interrupt no longer valid for this run) self._active_client = None - # Only destroy client if we created it (not reusing persistent one) - if client_ctx is not None: - logger.info("Cleaning up newly created client for this request") - await client_ctx.__aexit__(None, None, None) - else: - logger.info("Keeping persistent client alive for next request") + # Always disconnect client at end of run (no persistence) + if client is not None: + logger.info("Disconnecting client (end of run)") + await client.disconnect() # Finalize observability await obs.finalize() diff --git a/components/runners/claude-code-runner/main.py b/components/runners/claude-code-runner/main.py index 9ed76889..dee4e6c9 100644 --- a/components/runners/claude-code-runner/main.py +++ b/components/runners/claude-code-runner/main.py @@ -96,14 +96,7 @@ async def lifespan(app: FastAPI): adapter = ClaudeCodeAdapter() adapter.context = context - # Don't create persistent client here - adapter will create it on first request - # with proper options (MCP servers, system prompt, model, etc.) - # The client will then be stored in app.state for reuse - app.state.claude_client = None - app.state.claude_client_ctx = None - app.state.claude_lock = asyncio.Lock() # Serialize access to client - - logger.info("Client will be created on first request with full options") + logger.info("Adapter initialized - fresh client will be created for each run") # Check if this is a restart (conversation history exists) history_marker = Path(workspace_path) / ".claude" / "state" @@ -120,15 +113,8 @@ async def lifespan(app: FastAPI): yield - # Cleanup - disconnect the client if one was created + # Cleanup logger.info("Shutting down AG-UI server...") - if app.state.claude_client is not None: - logger.info("Disconnecting persistent Claude SDK client...") - try: - await app.state.claude_client.disconnect() - except Exception as e: - logger.warning(f"Error disconnecting client: {e}") - logger.info("Claude SDK client disconnected") async def auto_execute_initial_prompt(prompt: str, session_id: str): @@ -192,24 +178,6 @@ async def auto_execute_initial_prompt(prompt: str, session_id: str): logger.warning(f"INITIAL_PROMPT auto-execution error (backend will retry): {e}") -async def invalidate_persistent_client(app): - """Destroy persistent client so new one is created with new options. - - Call this when workflow or repos change and client needs new configuration. - """ - async with app.state.claude_lock: - if app.state.claude_client is not None: - logger.info("🔄 Invalidating persistent client (workflow/repo change)...") - try: - await app.state.claude_client.disconnect() - except Exception as e: - logger.warning(f"Error disconnecting old client: {e}") - app.state.claude_client = None - app.state.claude_client_ctx = None - logger.info("✅ Persistent client invalidated - will recreate on next request") - else: - logger.info("No persistent client to invalidate") - app = FastAPI( title="Claude Code AG-UI Server", @@ -263,15 +231,10 @@ async def event_generator(): logger.info("Starting adapter.process_run()...") - # Use lock to serialize access to the persistent client - async with request.app.state.claude_lock: - # Pass app.state so adapter can create/reuse persistent client - async for event in adapter.process_run( - run_agent_input, - app_state=request.app.state - ): - logger.debug(f"Yielding run event: {event.type}") - yield encoder.encode(event) + # Process the run (creates fresh client each time) + async for event in adapter.process_run(run_agent_input): + logger.debug(f"Yielding run event: {event.type}") + yield encoder.encode(event) logger.info("adapter.process_run() completed") except Exception as e: logger.error(f"Error in event generator: {e}") @@ -346,10 +309,7 @@ async def change_workflow(request: Request): _adapter_initialized = False adapter._first_run = True - # Destroy persistent client so new one is created with new options - await invalidate_persistent_client(request.app) - - logger.info("Workflow updated, persistent client invalidated - will recreate on next run") + logger.info("Workflow updated, adapter will reinitialize on next run") # Trigger a new run to greet user with workflow context # This runs in background via backend POST @@ -457,10 +417,7 @@ async def add_repo(request: Request): _adapter_initialized = False adapter._first_run = True - # Destroy persistent client so new one is created with new repo config - await invalidate_persistent_client(request.app) - - logger.info(f"Repo added, persistent client invalidated - will recreate on next run") + logger.info(f"Repo added, adapter will reinitialize on next run") return {"message": "Repository added"} @@ -497,10 +454,7 @@ async def remove_repo(request: Request): _adapter_initialized = False adapter._first_run = True - # Destroy persistent client so new one is created with new repo config - await invalidate_persistent_client(request.app) - - logger.info(f"Repo removed, persistent client invalidated - will recreate on next run") + logger.info(f"Repo removed, adapter will reinitialize on next run") return {"message": "Repository removed"}