Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 72 additions & 28 deletions components/backend/handlers/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"time"
"unicode/utf8"

Expand Down Expand Up @@ -46,6 +47,20 @@ var (

const runnerTokenRefreshedAtAnnotation = "ambient-code.io/token-refreshed-at"

// ootbWorkflowsCache provides in-memory caching for OOTB workflows to avoid GitHub API rate limits.
// The cache stores workflows by repo URL key and expires after ootbCacheTTL.
type ootbWorkflowsCache struct {
mu sync.RWMutex
workflows []OOTBWorkflow
cachedAt time.Time
cacheKey string // repo+branch+path combination
}

var (
ootbCache = &ootbWorkflowsCache{}
ootbCacheTTL = 5 * time.Minute // Cache OOTB workflows for 5 minutes
)

// isBinaryContentType checks if a MIME type represents binary content that should be base64 encoded.
// This includes images, archives, documents, executables, and other non-text formats.
func isBinaryContentType(contentType string) bool {
Expand Down Expand Up @@ -1718,35 +1733,10 @@ type OOTBWorkflow struct {
}

// ListOOTBWorkflows returns the list of out-of-the-box workflows dynamically discovered from GitHub
// Attempts to use user's GitHub token for better rate limits, falls back to unauthenticated for public repos
// Uses in-memory caching (5 min TTL) to avoid GitHub API rate limits.
// Attempts to use user's GitHub token for better rate limits when cache miss occurs.
// GET /api/workflows/ootb?project=<projectName>
func ListOOTBWorkflows(c *gin.Context) {
// Try to get user's GitHub token (best effort - not required)
// This gives better rate limits (5000/hr vs 60/hr) and supports private repos
// Project is optional - if provided, we'll try to get the user's token
token := ""
project := c.Query("project") // Optional query parameter
if project != "" {
usrID, _ := c.Get("userID")
k8sClt, sessDyn := GetK8sClientsForRequest(c)
if k8sClt == nil || sessDyn == nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid or missing token"})
c.Abort()
return
}
if userIDStr, ok := usrID.(string); ok && userIDStr != "" {
if githubToken, err := GetGitHubToken(c.Request.Context(), k8sClt, sessDyn, project, userIDStr); err == nil {
token = githubToken
log.Printf("ListOOTBWorkflows: using user's GitHub token for project %s (better rate limits)", project)
} else {
log.Printf("ListOOTBWorkflows: failed to get GitHub token for project %s: %v", project, err)
}
}
}
if token == "" {
log.Printf("ListOOTBWorkflows: proceeding without GitHub token (public repo, lower rate limits)")
}

// Read OOTB repo configuration from environment
ootbRepo := strings.TrimSpace(os.Getenv("OOTB_WORKFLOWS_REPO"))
if ootbRepo == "" {
Expand All @@ -1763,6 +1753,43 @@ func ListOOTBWorkflows(c *gin.Context) {
ootbWorkflowsPath = "workflows"
}

// Build cache key from repo configuration
cacheKey := fmt.Sprintf("%s|%s|%s", ootbRepo, ootbBranch, ootbWorkflowsPath)

// Check cache first (read lock)
ootbCache.mu.RLock()
if ootbCache.cacheKey == cacheKey && time.Since(ootbCache.cachedAt) < ootbCacheTTL && len(ootbCache.workflows) > 0 {
workflows := ootbCache.workflows
ootbCache.mu.RUnlock()
log.Printf("ListOOTBWorkflows: returning %d cached workflows (age: %v)", len(workflows), time.Since(ootbCache.cachedAt).Round(time.Second))
c.JSON(http.StatusOK, gin.H{"workflows": workflows})
return
}
ootbCache.mu.RUnlock()

// Cache miss - need to fetch from GitHub
// Try to get user's GitHub token (best effort - not required)
// This gives better rate limits (5000/hr vs 60/hr) and supports private repos
token := ""
project := c.Query("project") // Optional query parameter
if project != "" {
usrID, _ := c.Get("userID")
k8sClt, sessDyn := GetK8sClientsForRequest(c)
if k8sClt != nil && sessDyn != nil {
if userIDStr, ok := usrID.(string); ok && userIDStr != "" {
if githubToken, err := GetGitHubToken(c.Request.Context(), k8sClt, sessDyn, project, userIDStr); err == nil {
token = githubToken
log.Printf("ListOOTBWorkflows: using user's GitHub token for project %s (better rate limits)", project)
} else {
log.Printf("ListOOTBWorkflows: failed to get GitHub token for project %s: %v", project, err)
}
}
}
}
if token == "" {
log.Printf("ListOOTBWorkflows: proceeding without GitHub token (public repo, lower rate limits)")
}

// Parse GitHub URL
owner, repoName, err := git.ParseGitHubURL(ootbRepo)
if err != nil {
Expand All @@ -1775,6 +1802,16 @@ func ListOOTBWorkflows(c *gin.Context) {
entries, err := fetchGitHubDirectoryListing(c.Request.Context(), owner, repoName, ootbBranch, ootbWorkflowsPath, token)
if err != nil {
log.Printf("ListOOTBWorkflows: failed to list workflows directory: %v", err)
// On error, try to return stale cache if available
ootbCache.mu.RLock()
if len(ootbCache.workflows) > 0 && ootbCache.cacheKey == cacheKey {
workflows := ootbCache.workflows
ootbCache.mu.RUnlock()
log.Printf("ListOOTBWorkflows: returning stale cached workflows due to GitHub error")
c.JSON(http.StatusOK, gin.H{"workflows": workflows})
return
}
ootbCache.mu.RUnlock()
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to discover OOTB workflows"})
return
}
Expand Down Expand Up @@ -1822,7 +1859,14 @@ func ListOOTBWorkflows(c *gin.Context) {
})
}

log.Printf("ListOOTBWorkflows: discovered %d workflows from %s", len(workflows), ootbRepo)
// Update cache (write lock)
ootbCache.mu.Lock()
ootbCache.workflows = workflows
ootbCache.cachedAt = time.Now()
ootbCache.cacheKey = cacheKey
ootbCache.mu.Unlock()

log.Printf("ListOOTBWorkflows: discovered %d workflows from %s (cached for %v)", len(workflows), ootbRepo, ootbCacheTTL)
c.JSON(http.StatusOK, gin.H{"workflows": workflows})
}

Expand Down
66 changes: 49 additions & 17 deletions components/backend/websocket/agui_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,26 +147,58 @@ func HandleAGUIRunProxy(c *gin.Context) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Hour)
defer cancel()

proxyReq, err := http.NewRequestWithContext(ctx, "POST", runnerURL, bytes.NewReader(bodyBytes))
if err != nil {
log.Printf("AGUI Proxy: Failed to create request in background: %v", err)
updateRunStatus(runID, "error")
return
}

// Forward headers
proxyReq.Header.Set("Content-Type", "application/json")
proxyReq.Header.Set("Accept", "text/event-stream")

// Execute request
// Execute request with retries (runner may not be ready immediately after startup)
client := &http.Client{
Timeout: 0, // No timeout, context handles it
}
resp, err := client.Do(proxyReq)
if err != nil {
log.Printf("AGUI Proxy: Background request failed: %v", err)
updateRunStatus(runID, "error")
return

var resp *http.Response
maxRetries := 15
retryDelay := 500 * time.Millisecond

for attempt := 1; attempt <= maxRetries; attempt++ {
// Create fresh request for each attempt (body reader needs reset)
proxyReq, err := http.NewRequestWithContext(ctx, "POST", runnerURL, bytes.NewReader(bodyBytes))
if err != nil {
log.Printf("AGUI Proxy: Failed to create request in background: %v", err)
updateRunStatus(runID, "error")
return
}

// Forward headers
proxyReq.Header.Set("Content-Type", "application/json")
proxyReq.Header.Set("Accept", "text/event-stream")

resp, err = client.Do(proxyReq)
if err == nil {
break // Success!
}

// Check if it's a connection refused error (runner not ready yet)
errStr := err.Error()
isConnectionRefused := strings.Contains(errStr, "connection refused") ||
strings.Contains(errStr, "no such host") ||
strings.Contains(errStr, "dial tcp")

if !isConnectionRefused || attempt == maxRetries {
log.Printf("AGUI Proxy: Background request failed after %d attempts: %v", attempt, err)
updateRunStatus(runID, "error")
return
}

log.Printf("AGUI Proxy: Runner not ready (attempt %d/%d), retrying in %v...", attempt, maxRetries, retryDelay)

select {
case <-ctx.Done():
log.Printf("AGUI Proxy: Context cancelled during retry for run %s", runID)
return
case <-time.After(retryDelay):
// Exponential backoff with cap at 5 seconds
retryDelay = time.Duration(float64(retryDelay) * 1.5)
if retryDelay > 5*time.Second {
retryDelay = 5 * time.Second
}
}
}
defer resp.Body.Close()

Expand Down
Loading
Loading