diff --git a/.env.example b/.env.example
index 72351421..a9e93030 100644
--- a/.env.example
+++ b/.env.example
@@ -85,6 +85,35 @@
# Path to your iFlow credential file (e.g., ~/.iflow/oauth_creds.json).
#IFLOW_OAUTH_1=""
+# --- Kiro CLI / Kiro IDE ---
+# Multiple credential sources are supported (in priority order):
+#
+# Option 1: Direct refresh token (simplest for Docker/stateless deployments)
+#KIRO_REFRESH_TOKEN="your_kiro_refresh_token"
+# or
+#REFRESH_TOKEN="your_kiro_refresh_token"
+#
+# Option 2: JSON credentials file (from Kiro IDE)
+# Path to kiro-auth-token.json from Kiro IDE
+#KIRO_CREDS_FILE="~/.aws/sso/cache/kiro-auth-token.json"
+#
+# Option 3: SQLite database (from kiro-cli)
+# Path to kiro-cli SQLite database
+#KIRO_CLI_DB_FILE=""
+#
+# If none are set, the proxy will auto-detect from default paths:
+# JSON: ~/.aws/sso/cache/kiro-auth-token.json
+# SQLite (macOS): ~/Library/Application Support/kiro-cli/data.sqlite3
+# SQLite (macOS): ~/Library/Application Support/amazon-q/data.sqlite3
+# SQLite (Linux): ~/.local/share/kiro-cli/data.sqlite3
+# SQLite (Linux): ~/.local/share/amazon-q/data.sqlite3
+
+# Optional profile ARN (only needed for Kiro Desktop auth, not for AWS SSO)
+#PROFILE_ARN="arn:aws:codewhisperer:us-east-1:..."
+
+# Optional override for Kiro API region (default: us-east-1)
+#KIRO_REGION="us-east-1"
+
# ------------------------------------------------------------------------------
# | [ADVANCED] Provider-Specific Settings |
diff --git a/.gitignore b/.gitignore
index 3711fdfd..ab3d32c9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -131,4 +131,4 @@ cache/
*.env
oauth_creds/
-
+reference/
diff --git a/Deployment guide.md b/Deployment guide.md
index 44c7e033..17d85e05 100644
--- a/Deployment guide.md
+++ b/Deployment guide.md
@@ -21,11 +21,11 @@ Before deploying, try the proxy locally to ensure your keys work. This uses a pr
2. Download the latest release ZIP file (e.g., for Windows).
3. Unzip the file.
4. Double-click `setup_env.bat`. A window will open—follow the prompts to add your PROXY_API_KEY (a strong secret you create) and provider keys. Use the [LiteLLM Providers Documentation](https://docs.litellm.ai/docs/providers) for guidance on key formats (e.g., `GEMINI_API_KEY_1="your-key"`).
-5. Double-click `proxy_app.exe` to start the proxy. It runs at `http://127.0.0.1:8000`—visit in a browser to confirm "API Key Proxy is running".
+5. Double-click `proxy_app.exe` to start the proxy. It runs at `http://127.0.0.1:7777`—visit in a browser to confirm "API Key Proxy is running".
6. Test with curl (replace with your PROXY_API_KEY):
```
-curl -X POST http://127.0.0.1:8000/v1/chat/completions -H "Content-Type: application/json" -H "Authorization: Bearer your-proxy-key" -d '{"model": "gemini/gemini-2.5-flash", "messages": [{"role": "user", "content": "What is the capital of France?"}]}'
+curl -X POST http://127.0.0.1:7777/v1/chat/completions -H "Content-Type: application/json" -H "Authorization: Bearer your-proxy-key" -d '{"model": "gemini/gemini-2.5-flash", "messages": [{"role": "user", "content": "What is the capital of France?"}]}'
```
- Expected: A JSON response with the answer (e.g., "Paris").
@@ -220,7 +220,7 @@ docker compose ps
docker compose logs -f
# Test the endpoint
-curl http://localhost:8000/
+curl http://localhost:7777/
```
### Manual Docker Run
@@ -236,7 +236,7 @@ touch key_usage.json
docker run -d \
--name llm-api-proxy \
--restart unless-stopped \
- -p 8000:8000 \
+ -p 7777:7777 \
-v $(pwd)/.env:/app/.env:ro \
-v $(pwd)/oauth_creds:/app/oauth_creds \
-v $(pwd)/logs:/app/logs \
@@ -378,7 +378,7 @@ The image is built for both `linux/amd64` and `linux/arm64` architectures, so it
| Container exits immediately | Check logs: `docker compose logs` — likely missing `.env` or invalid config |
| Permission denied on volumes | Ensure directories exist and have correct permissions: `mkdir -p oauth_creds logs && chmod 755 oauth_creds logs` |
| OAuth credentials not loading | Verify `oauth_creds/` is mounted and contains valid JSON files, or check environment variables are set |
-| Port already in use | Change the port mapping: `-p 9000:8000` or edit `docker-compose.yml` |
+| Port already in use | Change the port mapping: `-p 9000:7777` or edit `docker-compose.yml` |
| Image not updating | Force pull: `docker compose pull && docker compose up -d` |
---
@@ -488,7 +488,7 @@ nano .env
# Also add your PROXY_API_KEY and other provider keys
# Start the proxy
-uvicorn src.proxy_app.main:app --host 0.0.0.0 --port 8000
+uvicorn src.proxy_app.main:app --host 0.0.0.0 --port 7777
```
**Method B: Upload Credential Files**
@@ -501,7 +501,7 @@ scp -r oauth_creds/ user@your-vps-ip:/path/to/LLM-API-Key-Proxy/
ls -la oauth_creds/
# Start the proxy
-uvicorn src.proxy_app.main:app --host 0.0.0.0 --port 8000
+uvicorn src.proxy_app.main:app --host 0.0.0.0 --port 7777
```
> **Note**: Environment variables are preferred for production deployments (more secure, easier to manage, works with container orchestration).
@@ -572,7 +572,7 @@ Still in the credential tool:
# Tunnels are no longer needed
# Start the proxy on VPS (in a screen/tmux session or as a service)
-uvicorn src.proxy_app.main:app --host 0.0.0.0 --port 8000
+uvicorn src.proxy_app.main:app --host 0.0.0.0 --port 7777
```
---
@@ -650,7 +650,7 @@ OAuth callback ports should **never** be publicly exposed:
# sudo ufw allow 11451/tcp
# ✅ Only open your proxy API port
-sudo ufw allow 8000/tcp
+sudo ufw allow 7777/tcp
# Check firewall status
sudo ufw status
@@ -677,7 +677,7 @@ Type=simple
User=your-username
WorkingDirectory=/path/to/LLM-API-Key-Proxy
Environment="PATH=/path/to/python/bin"
-ExecStart=/path/to/python/bin/uvicorn src.proxy_app.main:app --host 0.0.0.0 --port 8000
+ExecStart=/path/to/python/bin/uvicorn src.proxy_app.main:app --host 0.0.0.0 --port 7777
Restart=always
RestartSec=10
diff --git a/Dockerfile b/Dockerfile
index aafcb117..35387ab5 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -38,7 +38,7 @@ COPY src/ ./src/
RUN mkdir -p logs oauth_creds
# Expose the default port
-EXPOSE 8000
+EXPOSE 7777
# Set environment variables
ENV PYTHONUNBUFFERED=1
@@ -46,4 +46,4 @@ ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONPATH=/app/src
# Default command - runs proxy with the correct PYTHONPATH
-CMD ["python", "src/proxy_app/main.py", "--port", "8000"]
+CMD ["python", "src/proxy_app/main.py", "--port", "7777"]
diff --git a/README.md b/README.md
index a7c3c438..2a33da59 100644
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@ This project consists of two components:
- **One Endpoint, Many Providers** — Configure Gemini, OpenAI, Anthropic, and [any LiteLLM-supported provider](https://docs.litellm.ai/docs/providers) once. Access them all through a single API key
- **Anthropic API Compatible** — Use Claude Code or any Anthropic SDK client with non-Anthropic providers like Gemini, OpenAI, or custom models
- **Built-in Resilience** — Automatic key rotation, failover on errors, rate limit handling, and intelligent cooldowns
-- **Exclusive Provider Support** — Includes custom providers not available elsewhere: **Antigravity** (Gemini 3 + Claude Sonnet/Opus 4.5), **Gemini CLI**, **Qwen Code**, and **iFlow**
+- **Exclusive Provider Support** — Includes custom providers not available elsewhere: **Antigravity** (Gemini 3 + Claude Sonnet/Opus 4.5), **Gemini CLI**, **Kiro CLI**, **Qwen Code**, and **iFlow**
---
@@ -49,7 +49,7 @@ chmod +x proxy_app
# Pull and run directly
docker run -d \
--name llm-api-proxy \
- -p 8000:8000 \
+ -p 7777:7777 \
-v $(pwd)/.env:/app/.env:ro \
-v $(pwd)/oauth_creds:/app/oauth_creds \
-v $(pwd)/logs:/app/logs \
@@ -81,7 +81,7 @@ pip install -r requirements.txt
python src/proxy_app/main.py
```
-> **Tip:** Running with command-line arguments (e.g., `--host 0.0.0.0 --port 8000`) bypasses the TUI and starts the proxy directly.
+> **Tip:** Running with command-line arguments (e.g., `--host 0.0.0.0 --port 7777`) bypasses the TUI and starts the proxy directly.
---
@@ -91,7 +91,7 @@ Once the proxy is running, configure your application with these settings:
| Setting | Value |
|---------|-------|
-| **Base URL / API Endpoint** | `http://127.0.0.1:8000/v1` |
+| **Base URL / API Endpoint** | `http://127.0.0.1:7777/v1` |
| **API Key** | Your `PROXY_API_KEY` |
### Model Format: `provider/model_name`
@@ -104,6 +104,7 @@ openai/gpt-4o ← OpenAI API
anthropic/claude-3-5-sonnet ← Anthropic API
openrouter/anthropic/claude-3-opus ← OpenRouter
gemini_cli/gemini-2.5-pro ← Gemini CLI (OAuth)
+kiro_cli/claude-sonnet-4.5 ← Kiro CLI (Amazon Q/Kiro)
antigravity/gemini-3-pro-preview ← Antigravity (Gemini 3, Claude Opus 4.5)
```
@@ -116,7 +117,7 @@ antigravity/gemini-3-pro-preview ← Antigravity (Gemini 3, Claude Opus 4.5)
from openai import OpenAI
client = OpenAI(
- base_url="http://127.0.0.1:8000/v1",
+ base_url="http://127.0.0.1:7777/v1",
api_key="your-proxy-api-key"
)
@@ -133,7 +134,7 @@ print(response.choices[0].message.content)
curl
```bash
-curl -X POST http://127.0.0.1:8000/v1/chat/completions \
+curl -X POST http://127.0.0.1:7777/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your-proxy-api-key" \
-d '{
@@ -150,7 +151,7 @@ curl -X POST http://127.0.0.1:8000/v1/chat/completions \
1. Go to **API Settings**
2. Select **"Proxy"** or **"Custom OpenAI"** mode
3. Configure:
- - **API URL:** `http://127.0.0.1:8000/v1`
+ - **API URL:** `http://127.0.0.1:7777/v1`
- **API Key:** Your `PROXY_API_KEY`
- **Model:** `provider/model_name` (e.g., `gemini/gemini-2.5-flash`)
4. Save and start chatting
@@ -169,7 +170,7 @@ In your configuration file (e.g., `config.json`):
"title": "Gemini via Proxy",
"provider": "openai",
"model": "gemini/gemini-2.5-flash",
- "apiBase": "http://127.0.0.1:8000/v1",
+ "apiBase": "http://127.0.0.1:7777/v1",
"apiKey": "your-proxy-api-key"
}
]
@@ -187,7 +188,7 @@ Claude Code natively supports custom Anthropic API endpoints. The recommended se
{
"env": {
"ANTHROPIC_AUTH_TOKEN": "your-proxy-api-key",
- "ANTHROPIC_BASE_URL": "http://127.0.0.1:8000",
+ "ANTHROPIC_BASE_URL": "http://127.0.0.1:7777",
"ANTHROPIC_DEFAULT_OPUS_MODEL": "gemini/gemini-3-pro",
"ANTHROPIC_DEFAULT_SONNET_MODEL": "gemini/gemini-3-flash",
"ANTHROPIC_DEFAULT_HAIKU_MODEL": "openai/gpt-5-mini"
@@ -206,7 +207,7 @@ Now you can use Claude Code with Gemini, OpenAI, or any other configured provide
from anthropic import Anthropic
client = Anthropic(
- base_url="http://127.0.0.1:8000",
+ base_url="http://127.0.0.1:7777",
api_key="your-proxy-api-key"
)
@@ -594,6 +595,62 @@ TIMEOUT_READ_NON_STREAMING=600 # Full response wait (10 min)
## OAuth Providers
+
+Kiro CLI / Kiro IDE (Amazon Q / Kiro)
+
+Supports multiple credential sources from Kiro IDE or kiro-cli.
+
+**Setup (choose one):**
+
+**Option 1: Direct Refresh Token** (best for Docker/stateless deployments)
+```env
+KIRO_REFRESH_TOKEN="your_kiro_refresh_token"
+# or
+REFRESH_TOKEN="your_kiro_refresh_token"
+
+# Optional (only for Kiro Desktop auth, not needed for AWS SSO)
+PROFILE_ARN="arn:aws:codewhisperer:us-east-1:..."
+```
+
+**Option 2: Kiro IDE (JSON credentials)** - Recommended for local use
+1. Open Kiro IDE and sign in
+2. The proxy auto-detects `~/.aws/sso/cache/kiro-auth-token.json`
+```env
+# Or specify explicitly:
+KIRO_CREDS_FILE="~/.aws/sso/cache/kiro-auth-token.json"
+```
+
+**Option 3: kiro-cli (SQLite database)**
+1. Install and log in with `kiro-cli`
+2. The proxy auto-detects the SQLite database
+```env
+# Or specify explicitly:
+KIRO_CLI_DB_FILE="~/Library/Application Support/kiro-cli/data.sqlite3"
+```
+
+**Common Options:**
+
+```env
+# Optional region override (default: us-east-1)
+KIRO_REGION="us-east-1"
+```
+
+**Default auto-detection locations (checked in order):**
+
+1. JSON: `~/.aws/sso/cache/kiro-auth-token.json`
+2. SQLite (macOS): `~/Library/Application Support/kiro-cli/data.sqlite3`
+3. SQLite (macOS): `~/Library/Application Support/amazon-q/data.sqlite3`
+4. SQLite (Linux): `~/.local/share/kiro-cli/data.sqlite3`
+5. SQLite (Linux): `~/.local/share/amazon-q/data.sqlite3`
+
+**Authentication Types:**
+
+The proxy automatically detects the auth type based on credentials:
+- **Kiro Desktop Auth**: When `clientId`/`clientSecret` are NOT present (uses Kiro refresh endpoint)
+- **AWS SSO OIDC**: When `clientId`/`clientSecret` ARE present (uses AWS OIDC endpoint)
+
+
+
Gemini CLI
@@ -799,7 +856,7 @@ python src/proxy_app/main.py [OPTIONS]
Options:
--host TEXT Host to bind (default: 0.0.0.0)
- --port INTEGER Port to run on (default: 8000)
+ --port INTEGER Port to run on (default: 7777)
--enable-request-logging Enable detailed per-request logging
--add-credential Launch interactive credential setup tool
```
@@ -871,7 +928,7 @@ touch key_usage.json
docker run -d \
--name llm-api-proxy \
--restart unless-stopped \
- -p 8000:8000 \
+ -p 7777:7777 \
-v $(pwd)/.env:/app/.env:ro \
-v $(pwd)/oauth_creds:/app/oauth_creds \
-v $(pwd)/logs:/app/logs \
@@ -945,7 +1002,7 @@ After=network.target
[Service]
Type=simple
WorkingDirectory=/path/to/LLM-API-Key-Proxy
-ExecStart=/path/to/python -m uvicorn src.proxy_app.main:app --host 0.0.0.0 --port 8000
+ExecStart=/path/to/python -m uvicorn src.proxy_app.main:app --host 0.0.0.0 --port 7777
Restart=always
[Install]
diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml
index 36458929..550ce20b 100644
--- a/docker-compose.dev.yml
+++ b/docker-compose.dev.yml
@@ -11,7 +11,7 @@ services:
max-size: "10m"
max-file: "3"
ports:
- - "${PORT:-8000}:8000"
+ - "${PORT:-7777}:7777"
volumes:
# Mount .env files for configuration
- ./.env:/app/.env:ro
@@ -23,6 +23,14 @@ services:
- ./key_usage.json:/app/key_usage.json
# Optionally mount additional .env files (e.g., combined credential files)
# - ./antigravity_all_combined.env:/app/antigravity_all_combined.env:ro
+
+ # --- Kiro CLI Credentials ---
+ # Option 1: Mount AWS SSO cache (JSON credentials from Kiro IDE) - RECOMMENDED
+ - ~/.aws/sso/cache:/root/.aws/sso/cache:ro
+ # Option 2: Mount kiro-cli SQLite database (macOS)
+ - ~/Library/Application Support/kiro-cli:/root/Library/Application Support/kiro-cli:ro
+ # Option 3: Mount kiro-cli SQLite database (Linux - uncomment if on Linux)
+ # - ~/.local/share/kiro-cli:/root/.local/share/kiro-cli:ro
environment:
# Skip OAuth interactive initialization in container (non-interactive)
- SKIP_OAUTH_INIT_CHECK=true
diff --git a/docker-compose.tls.yml b/docker-compose.tls.yml
index e210423f..8c7c4600 100644
--- a/docker-compose.tls.yml
+++ b/docker-compose.tls.yml
@@ -28,7 +28,7 @@ services:
max-size: "10m"
max-file: "3"
ports:
- - "${PORT:-8000}:8000"
+ - "${PORT:-7777}:7777"
volumes:
# Mount .env files for configuration
- ./.env:/app/.env:ro
diff --git a/docker-compose.yml b/docker-compose.yml
index 31964b60..06f9858f 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -9,7 +9,7 @@ services:
max-size: "10m"
max-file: "3"
ports:
- - "${PORT:-8000}:8000"
+ - "${PORT:-7777}:7777"
volumes:
# Mount .env files for configuration
- ./.env:/app/.env:ro
diff --git a/src/proxy_app/launcher_tui.py b/src/proxy_app/launcher_tui.py
index 68338b02..cafaf509 100644
--- a/src/proxy_app/launcher_tui.py
+++ b/src/proxy_app/launcher_tui.py
@@ -62,7 +62,7 @@ def __init__(self, config_path: Path = Path("launcher_config.json")):
self.config_path = config_path
self.defaults = {
"host": "127.0.0.1",
- "port": 8000,
+ "port": 7777,
"enable_request_logging": False,
"enable_raw_logging": False,
}
@@ -217,6 +217,50 @@ def detect_credentials() -> dict:
providers[provider] = {"api_keys": 0, "oauth": 0, "custom": False}
providers[provider]["oauth"] += oauth_count
+ # Detect Kiro CLI credentials (env token, JSON, or SQLite)
+ kiro_refresh_token = env_vars.get("KIRO_REFRESH_TOKEN") or env_vars.get("REFRESH_TOKEN")
+ kiro_creds_file = env_vars.get("KIRO_CREDS_FILE")
+ kiro_cli_db = env_vars.get("KIRO_CLI_DB_FILE")
+ kiro_detected = False
+
+ # Check explicit env vars first (in priority order)
+ if kiro_refresh_token:
+ kiro_detected = True
+ elif kiro_creds_file:
+ path = Path(kiro_creds_file).expanduser()
+ if path.exists():
+ kiro_detected = True
+ elif kiro_cli_db:
+ path = Path(kiro_cli_db).expanduser()
+ if path.exists():
+ kiro_detected = True
+
+ # Check default paths if not found via env vars
+ if not kiro_detected:
+ default_paths = [
+ # JSON (Kiro IDE)
+ Path.home() / ".aws" / "sso" / "cache" / "kiro-auth-token.json",
+ # SQLite (macOS)
+ Path.home() / "Library" / "Application Support" / "kiro-cli" / "data.sqlite3",
+ Path.home() / "Library" / "Application Support" / "amazon-q" / "data.sqlite3",
+ # SQLite (Linux)
+ Path.home() / ".local" / "share" / "kiro-cli" / "data.sqlite3",
+ Path.home() / ".local" / "share" / "amazon-q" / "data.sqlite3",
+ ]
+ for path in default_paths:
+ if path.exists():
+ kiro_detected = True
+ break
+
+ if kiro_detected:
+ if "kiro_cli" not in providers:
+ providers["kiro_cli"] = {
+ "api_keys": 0,
+ "oauth": 0,
+ "custom": False,
+ }
+ providers["kiro_cli"]["oauth"] += 1
+
# Mark custom providers (have API_BASE set)
for provider in providers:
if os.getenv(f"{provider.upper()}_API_BASE"):
@@ -695,7 +739,7 @@ def show_config_menu(self):
# Reset to Default Settings
# Define defaults
default_host = "127.0.0.1"
- default_port = 8000
+ default_port = 7777
default_logging = False
default_raw_logging = False
default_api_key = "VerysecretKey"
diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py
index 12014bdc..a8f88253 100644
--- a/src/proxy_app/main.py
+++ b/src/proxy_app/main.py
@@ -17,7 +17,7 @@
parser.add_argument(
"--host", type=str, default="0.0.0.0", help="Host to bind the server to."
)
-parser.add_argument("--port", type=int, default=8000, help="Port to run the server on.")
+parser.add_argument("--port", type=int, default=7777, help="Port to run the server on.")
parser.add_argument(
"--enable-request-logging",
action="store_true",
@@ -450,6 +450,11 @@ async def lifespan(app: FastAPI):
credentials_to_initialize[provider].append(path)
continue
+ # Skip kiro_cli credentials - they use SQLite DB, not JSON
+ if provider == "kiro_cli":
+ credentials_to_initialize[provider].append(path)
+ continue
+
try:
with open(path, "r") as f:
data = json.load(f)
diff --git a/src/proxy_app/quota_viewer.py b/src/proxy_app/quota_viewer.py
index 6cdb224d..30b63ccc 100644
--- a/src/proxy_app/quota_viewer.py
+++ b/src/proxy_app/quota_viewer.py
@@ -230,7 +230,7 @@ def _get_headers(self) -> Dict[str, str]:
def _get_base_url(self) -> str:
"""Get base URL for the current remote."""
if not self.current_remote:
- return "http://127.0.0.1:8000"
+ return "http://127.0.0.1:7777"
host = self.current_remote.get("host", "127.0.0.1")
host = normalize_host_for_connection(host)
@@ -239,7 +239,7 @@ def _get_base_url(self) -> str:
return host.rstrip("/")
# Otherwise construct from host:port
- port = self.current_remote.get("port", 8000)
+ port = self.current_remote.get("port", 7777)
scheme = get_scheme_for_host(host, port)
return f"{scheme}://{host}:{port}"
@@ -250,8 +250,8 @@ def _build_endpoint_url(self, endpoint: str) -> str:
Handles cases where base URL already contains a path (e.g., /v1):
- Base: "https://api.example.com/v1", Endpoint: "/v1/quota-stats"
-> "https://api.example.com/v1/quota-stats" (no duplication)
- - Base: "http://localhost:8000", Endpoint: "/v1/quota-stats"
- -> "http://localhost:8000/v1/quota-stats"
+ - Base: "http://localhost:7777", Endpoint: "/v1/quota-stats"
+ -> "http://localhost:7777/v1/quota-stats"
Args:
endpoint: The endpoint path (e.g., "/v1/quota-stats")
@@ -308,7 +308,7 @@ def check_connection(
# Hit the root domain, not the path (e.g., /v1 would 404)
url = f"{parsed.scheme}://{parsed.netloc}/"
else:
- port = remote.get("port", 8000)
+ port = remote.get("port", 7777)
scheme = get_scheme_for_host(host, port)
url = f"{scheme}://{host}:{port}/"
@@ -1349,7 +1349,7 @@ def show_manage_remotes_screen(self):
str(idx),
remote.get("name", ""),
remote.get("host", ""),
- str(remote.get("port", 8000)),
+ str(remote.get("port", 7777)),
is_default,
)
@@ -1406,7 +1406,7 @@ def _add_remote_dialog(self):
if is_full_url(host):
port_default = ""
else:
- port_default = "8000"
+ port_default = "7777"
port_str = Prompt.ask(
"Port (empty for full URLs)", default=port_default
@@ -1417,7 +1417,7 @@ def _add_remote_dialog(self):
try:
port = int(port_str)
except ValueError:
- port = 8000
+ port = 7777
api_key = Prompt.ask("API Key (optional)", default="").strip() or None
@@ -1455,7 +1455,7 @@ def _edit_remote_dialog(self, remote: Dict[str, Any]):
try:
new_port = int(new_port_str)
except ValueError:
- new_port = current_port if current_port != "" else 8000
+ new_port = current_port if current_port != "" else 7777
current_key = remote.get("api_key", "") or ""
display_key = f"{current_key[:8]}..." if len(current_key) > 8 else current_key
diff --git a/src/proxy_app/quota_viewer_config.py b/src/proxy_app/quota_viewer_config.py
index 7b2f573f..c6cbaee0 100644
--- a/src/proxy_app/quota_viewer_config.py
+++ b/src/proxy_app/quota_viewer_config.py
@@ -57,7 +57,7 @@ def _load(self) -> Dict[str, Any]:
{
"name": "Local",
"host": "127.0.0.1",
- "port": 8000,
+ "port": 7777,
"api_key": None,
"is_default": True,
}
@@ -112,7 +112,7 @@ def add_remote(
self,
name: str,
host: str,
- port: Optional[Union[int, str]] = 8000,
+ port: Optional[Union[int, str]] = 7777,
api_key: Optional[str] = None,
is_default: bool = False,
) -> bool:
@@ -122,7 +122,7 @@ def add_remote(
Args:
name: Display name for the remote
host: Hostname, IP address, or full URL (e.g., https://api.example.com/v1)
- port: Port number (default 8000). Can be None or empty string for full URLs.
+ port: Port number (default 7777). Can be None or empty string for full URLs.
api_key: Optional API key for authentication
is_default: Whether this should be the default remote
@@ -252,7 +252,7 @@ def sync_with_launcher_config(self) -> None:
launcher_config = json.load(f)
host = launcher_config.get("host", "127.0.0.1")
- port = launcher_config.get("port", 8000)
+ port = launcher_config.get("port", 7777)
# Update Local remote
local_remote = self.get_remote_by_name("Local")
diff --git a/src/rotator_library/credential_manager.py b/src/rotator_library/credential_manager.py
index 9a7e5edb..94109b8b 100644
--- a/src/rotator_library/credential_manager.py
+++ b/src/rotator_library/credential_manager.py
@@ -124,6 +124,55 @@ def discover_and_prepare(self) -> Dict[str, List[str]]:
lib_logger.info("Starting automated OAuth credential discovery...")
final_config = {}
+ # Special-case: Kiro CLI supports multiple credential sources
+ # Priority: REFRESH_TOKEN (env) > KIRO_CREDS_FILE (JSON) > KIRO_CLI_DB_FILE (SQLite) > default paths
+ kiro_refresh_token = self.env_vars.get("KIRO_REFRESH_TOKEN") or self.env_vars.get("REFRESH_TOKEN")
+ kiro_creds_file = self.env_vars.get("KIRO_CREDS_FILE")
+ kiro_cli_db_path = self.env_vars.get("KIRO_CLI_DB_FILE")
+
+ if kiro_refresh_token:
+ # Use env:token: prefix to indicate direct refresh token from env
+ final_config["kiro_cli"] = ["env:token:kiro_cli"]
+ lib_logger.info("Using KIRO_REFRESH_TOKEN for kiro_cli")
+ elif kiro_creds_file:
+ path = Path(kiro_creds_file).expanduser()
+ if path.exists():
+ # Use json: prefix to indicate JSON credential type
+ final_config["kiro_cli"] = [f"json:{path.resolve()}"]
+ lib_logger.info(f"Using KIRO_CREDS_FILE for kiro_cli: {path}")
+ elif kiro_cli_db_path:
+ path = Path(kiro_cli_db_path).expanduser()
+ if path.exists():
+ final_config["kiro_cli"] = [str(path.resolve())]
+ lib_logger.info(f"Using KIRO_CLI_DB_FILE for kiro_cli: {path}")
+
+ if "kiro_cli" not in final_config:
+ # Check default paths: JSON first, then SQLite
+ default_json_paths = [
+ Path.home() / ".aws" / "sso" / "cache" / "kiro-auth-token.json",
+ ]
+ default_sqlite_paths = [
+ # macOS
+ Path.home() / "Library" / "Application Support" / "kiro-cli" / "data.sqlite3",
+ Path.home() / "Library" / "Application Support" / "amazon-q" / "data.sqlite3",
+ # Linux
+ Path.home() / ".local" / "share" / "kiro-cli" / "data.sqlite3",
+ Path.home() / ".local" / "share" / "amazon-q" / "data.sqlite3",
+ ]
+ # Try JSON first
+ for path in default_json_paths:
+ if path.exists():
+ final_config["kiro_cli"] = [f"json:{path.resolve()}"]
+ lib_logger.info(f"Using default kiro_cli JSON path: {path}")
+ break
+ # Then try SQLite
+ if "kiro_cli" not in final_config:
+ for path in default_sqlite_paths:
+ if path.exists():
+ final_config["kiro_cli"] = [str(path.resolve())]
+ lib_logger.info(f"Using default kiro_cli SQLite path: {path}")
+ break
+
# PHASE 1: Discover environment variable-based OAuth credentials
# These take priority for stateless deployments
env_oauth_creds = self._discover_env_oauth_credentials()
diff --git a/src/rotator_library/litellm_providers.py b/src/rotator_library/litellm_providers.py
index 136f62b5..291b4fac 100644
--- a/src/rotator_library/litellm_providers.py
+++ b/src/rotator_library/litellm_providers.py
@@ -515,7 +515,7 @@
"route": 'lemonade/',
"api_key_env_vars": [],
"api_base_env_vars": ['LEMONADE_API_BASE'],
- "api_base_url": 'http://localhost:8000/api/v1',
+ "api_base_url": 'http://localhost:7777/api/v1',
"endpoints": ['/chat/completions'],
"features": ['streaming', 'function_calling', 'response_format', 'tools'],
"model_count": 2,
diff --git a/src/rotator_library/provider_config.py b/src/rotator_library/provider_config.py
index 51d40043..2f0582bb 100644
--- a/src/rotator_library/provider_config.py
+++ b/src/rotator_library/provider_config.py
@@ -422,7 +422,7 @@
"category": "local",
"note": "Local proxy. API key is optional.",
"extra_vars": [
- ("LEMONADE_API_BASE", "Lemonade URL", "http://localhost:8000/api/v1"),
+ ("LEMONADE_API_BASE", "Lemonade URL", "http://localhost:7777/api/v1"),
],
},
# NOTE: ollama, llamafile, petals, triton are in PROVIDER_BLACKLIST
@@ -643,7 +643,7 @@ class ProviderConfig:
Request: openai/gpt-4 → LiteLLM gets model="openai/gpt-4", api_base="http://..."
2. Custom OpenAI-compatible provider:
- Set MYSERVER_API_BASE=http://myserver:8000/v1
+ Set MYSERVER_API_BASE=http://myserver:7777/v1
Request: myserver/llama-3 → LiteLLM gets model="openai/llama-3",
api_base="http://...", custom_llm_provider="openai"
"""
diff --git a/src/rotator_library/providers/__init__.py b/src/rotator_library/providers/__init__.py
index 8ba35c5e..71e5de33 100644
--- a/src/rotator_library/providers/__init__.py
+++ b/src/rotator_library/providers/__init__.py
@@ -24,7 +24,7 @@ class DynamicOpenAICompatibleProvider:
_API_KEY - The API key
Example:
- MYSERVER_API_BASE=http://localhost:8000/v1
+ MYSERVER_API_BASE=http://localhost:7777/v1
MYSERVER_API_KEY=sk-xxx
Note: For known providers (openai, anthropic, etc.), setting _API_BASE
diff --git a/src/rotator_library/providers/kiro_auth_base.py b/src/rotator_library/providers/kiro_auth_base.py
new file mode 100644
index 00000000..fc8f8c46
--- /dev/null
+++ b/src/rotator_library/providers/kiro_auth_base.py
@@ -0,0 +1,511 @@
+# SPDX-License-Identifier: LGPL-3.0-only
+# Copyright (c) 2026 Mirrowel
+
+import asyncio
+import json
+import logging
+import sqlite3
+from datetime import datetime, timedelta, timezone
+from enum import Enum
+from pathlib import Path
+from typing import Optional
+
+import httpx
+
+from .utilities.kiro_utils import (
+ TOKEN_REFRESH_THRESHOLD,
+ get_kiro_refresh_url,
+ get_kiro_api_host,
+ get_kiro_q_host,
+ get_aws_sso_oidc_url,
+ get_machine_fingerprint,
+)
+
+
+lib_logger = logging.getLogger("rotator_library")
+
+
+SQLITE_TOKEN_KEYS = [
+ "kirocli:social:token",
+ "kirocli:odic:token",
+ "codewhisperer:odic:token",
+]
+
+SQLITE_REGISTRATION_KEYS = [
+ "kirocli:odic:device-registration",
+ "codewhisperer:odic:device-registration",
+]
+
+
+class AuthType(Enum):
+ KIRO_DESKTOP = "kiro_desktop"
+ AWS_SSO_OIDC = "aws_sso_oidc"
+
+
+class KiroAuthManager:
+ def __init__(
+ self,
+ refresh_token: Optional[str] = None,
+ profile_arn: Optional[str] = None,
+ region: str = "us-east-1",
+ sqlite_db: Optional[str] = None,
+ json_creds_file: Optional[str] = None,
+ client_id: Optional[str] = None,
+ client_secret: Optional[str] = None,
+ ) -> None:
+ self._refresh_token = refresh_token
+ self._profile_arn = profile_arn
+ self._region = region
+ self._sqlite_db = sqlite_db
+ self._json_creds_file = json_creds_file
+ self._client_id = client_id
+ self._client_secret = client_secret
+ self._sso_region: Optional[str] = None
+ self._scopes: Optional[list] = None
+ self._client_id_hash: Optional[str] = None
+
+ self._sqlite_token_key: Optional[str] = None
+ self._access_token: Optional[str] = None
+ self._expires_at: Optional[datetime] = None
+ self._lock = asyncio.Lock()
+
+ self._auth_type: AuthType = AuthType.KIRO_DESKTOP
+ self._refresh_url = get_kiro_refresh_url(region)
+ self._api_host = get_kiro_api_host(region)
+ self._q_host = get_kiro_q_host(region)
+ self._fingerprint = get_machine_fingerprint()
+
+ # Load credentials from JSON file first (if provided), then SQLite as fallback
+ if json_creds_file:
+ self._load_credentials_from_json_file(json_creds_file)
+ elif sqlite_db:
+ self._load_credentials_from_sqlite(sqlite_db)
+
+ self._detect_auth_type()
+
+ @property
+ def auth_type(self) -> AuthType:
+ return self._auth_type
+
+ @property
+ def profile_arn(self) -> Optional[str]:
+ return self._profile_arn
+
+ @property
+ def api_host(self) -> str:
+ return self._api_host
+
+ @property
+ def q_host(self) -> str:
+ return self._q_host
+
+ @property
+ def fingerprint(self) -> str:
+ return self._fingerprint
+
+ def _detect_auth_type(self) -> None:
+ if self._client_id and self._client_secret:
+ self._auth_type = AuthType.AWS_SSO_OIDC
+ else:
+ self._auth_type = AuthType.KIRO_DESKTOP
+
+ def _load_credentials_from_json_file(self, file_path: str) -> None:
+ """Load credentials from a JSON file (e.g., kiro-auth-token.json)."""
+ try:
+ path = Path(file_path).expanduser()
+ if not path.exists():
+ lib_logger.warning(f"Credentials file not found: {file_path}")
+ return
+
+ with open(path, "r", encoding="utf-8") as f:
+ data = json.load(f)
+
+ # Load tokens
+ if "refreshToken" in data:
+ self._refresh_token = data["refreshToken"]
+ if "accessToken" in data:
+ self._access_token = data["accessToken"]
+ if "profileArn" in data:
+ self._profile_arn = data["profileArn"]
+ if "region" in data:
+ self._region = data["region"]
+ self._sso_region = data["region"]
+ # Update URLs for new region
+ self._refresh_url = get_kiro_refresh_url(self._region)
+ self._api_host = get_kiro_api_host(self._region)
+ self._q_host = get_kiro_q_host(self._region)
+ lib_logger.debug(f"Region from JSON file: {self._region}")
+
+ # Load clientIdHash for Enterprise Kiro IDE
+ if "clientIdHash" in data:
+ self._client_id_hash = data["clientIdHash"]
+ self._load_enterprise_device_registration(self._client_id_hash, path.parent)
+
+ # Load AWS SSO OIDC fields if directly in credentials file
+ if "clientId" in data:
+ self._client_id = data["clientId"]
+ if "clientSecret" in data:
+ self._client_secret = data["clientSecret"]
+
+ # Parse expiresAt
+ if "expiresAt" in data:
+ try:
+ expires_str = data["expiresAt"]
+ if expires_str.endswith("Z"):
+ self._expires_at = datetime.fromisoformat(
+ expires_str.replace("Z", "+00:00")
+ )
+ else:
+ self._expires_at = datetime.fromisoformat(expires_str)
+ except Exception as exc:
+ lib_logger.warning(f"Failed to parse expiresAt: {exc}")
+
+ lib_logger.info(f"Credentials loaded from JSON file: {file_path}")
+
+ except json.JSONDecodeError as exc:
+ lib_logger.error(f"JSON decode error in credentials file: {exc}")
+ except Exception as exc:
+ lib_logger.error(f"Error loading credentials from JSON file: {exc}")
+
+ def _load_enterprise_device_registration(
+ self, client_id_hash: str, cache_dir: Path
+ ) -> None:
+ """
+ Load clientId and clientSecret from Enterprise Kiro IDE device registration file.
+ Enterprise Kiro IDE uses AWS SSO OIDC. Device registration is stored at:
+ ~/.aws/sso/cache/{clientIdHash}.json
+ """
+ try:
+ registration_file = cache_dir / f"{client_id_hash}.json"
+ if not registration_file.exists():
+ lib_logger.debug(
+ f"Device registration file not found: {registration_file}"
+ )
+ return
+
+ with open(registration_file, "r", encoding="utf-8") as f:
+ data = json.load(f)
+
+ if "clientId" in data:
+ self._client_id = data["clientId"]
+ if "clientSecret" in data:
+ self._client_secret = data["clientSecret"]
+ if "region" in data and not self._sso_region:
+ self._sso_region = data["region"]
+
+ lib_logger.info(
+ f"Loaded device registration from: {registration_file.name}"
+ )
+ except Exception as exc:
+ lib_logger.warning(f"Failed to load device registration: {exc}")
+
+ def _save_credentials_to_json_file(self) -> None:
+ """Save updated credentials back to the JSON file."""
+ if not self._json_creds_file:
+ return
+
+ try:
+ path = Path(self._json_creds_file).expanduser()
+ if not path.exists():
+ lib_logger.warning(f"JSON file not found for writing: {self._json_creds_file}")
+ return
+
+ # Read existing data to preserve other fields
+ with open(path, "r", encoding="utf-8") as f:
+ data = json.load(f)
+
+ # Update token fields
+ if self._access_token:
+ data["accessToken"] = self._access_token
+ if self._refresh_token:
+ data["refreshToken"] = self._refresh_token
+ if self._expires_at:
+ data["expiresAt"] = self._expires_at.isoformat()
+
+ with open(path, "w", encoding="utf-8") as f:
+ json.dump(data, f, indent=2)
+
+ lib_logger.debug(f"Saved credentials to JSON file: {self._json_creds_file}")
+ except Exception as exc:
+ lib_logger.error(f"Error saving credentials to JSON file: {exc}")
+
+ def _load_credentials_from_sqlite(self, db_path: str) -> None:
+ try:
+ path = Path(db_path).expanduser()
+ if not path.exists():
+ lib_logger.warning(f"SQLite database not found: {db_path}")
+ return
+
+ conn = sqlite3.connect(str(path))
+ cursor = conn.cursor()
+
+ token_row = None
+ for key in SQLITE_TOKEN_KEYS:
+ cursor.execute("SELECT value FROM auth_kv WHERE key = ?", (key,))
+ token_row = cursor.fetchone()
+ if token_row:
+ self._sqlite_token_key = key
+ break
+
+ if token_row:
+ token_data = json.loads(token_row[0])
+ if token_data:
+ self._access_token = token_data.get("access_token") or token_data.get(
+ "accessToken"
+ )
+ self._refresh_token = token_data.get(
+ "refresh_token"
+ ) or token_data.get("refreshToken")
+ self._profile_arn = token_data.get("profile_arn") or token_data.get(
+ "profileArn"
+ )
+ if "region" in token_data:
+ self._sso_region = token_data.get("region")
+ self._scopes = token_data.get("scopes")
+
+ expires_str = token_data.get("expires_at") or token_data.get(
+ "expiresAt"
+ )
+ if expires_str:
+ try:
+ if expires_str.endswith("Z"):
+ expires_str = expires_str.replace("Z", "+00:00")
+ self._expires_at = datetime.fromisoformat(expires_str)
+ except Exception as exc:
+ lib_logger.warning(
+ f"Failed to parse expires_at from SQLite: {exc}"
+ )
+
+ registration_row = None
+ for key in SQLITE_REGISTRATION_KEYS:
+ cursor.execute("SELECT value FROM auth_kv WHERE key = ?", (key,))
+ registration_row = cursor.fetchone()
+ if registration_row:
+ break
+
+ if registration_row:
+ registration_data = json.loads(registration_row[0])
+ if registration_data:
+ self._client_id = registration_data.get("client_id") or registration_data.get(
+ "clientId"
+ )
+ self._client_secret = registration_data.get(
+ "client_secret"
+ ) or registration_data.get("clientSecret")
+ if not self._sso_region:
+ self._sso_region = registration_data.get("region")
+
+ conn.close()
+ except sqlite3.Error as exc:
+ lib_logger.error(f"SQLite error loading credentials: {exc}")
+ except json.JSONDecodeError as exc:
+ lib_logger.error(f"JSON decode error in SQLite data: {exc}")
+ except Exception as exc:
+ lib_logger.error(f"Error loading credentials from SQLite: {exc}")
+
+ def _save_credentials_to_sqlite(self) -> None:
+ if not self._sqlite_db:
+ return
+
+ try:
+ path = Path(self._sqlite_db).expanduser()
+ if not path.exists():
+ lib_logger.warning(
+ f"SQLite database not found for writing: {self._sqlite_db}"
+ )
+ return
+
+ conn = sqlite3.connect(str(path), timeout=5.0)
+ cursor = conn.cursor()
+
+ token_data = {
+ "access_token": self._access_token,
+ "refresh_token": self._refresh_token,
+ "expires_at": self._expires_at.isoformat() if self._expires_at else None,
+ "region": self._sso_region or self._region,
+ }
+ if self._scopes:
+ token_data["scopes"] = self._scopes
+
+ token_json = json.dumps(token_data)
+ if self._sqlite_token_key:
+ cursor.execute(
+ "UPDATE auth_kv SET value = ? WHERE key = ?",
+ (token_json, self._sqlite_token_key),
+ )
+ if cursor.rowcount > 0:
+ conn.commit()
+ conn.close()
+ return
+
+ for key in SQLITE_TOKEN_KEYS:
+ cursor.execute(
+ "UPDATE auth_kv SET value = ? WHERE key = ?",
+ (token_json, key),
+ )
+ if cursor.rowcount > 0:
+ conn.commit()
+ conn.close()
+ return
+
+ conn.close()
+ lib_logger.warning("Failed to save credentials to SQLite: no matching keys")
+ except sqlite3.Error as exc:
+ lib_logger.error(f"SQLite error saving credentials: {exc}")
+ except Exception as exc:
+ lib_logger.error(f"Error saving credentials to SQLite: {exc}")
+
+ def is_token_expiring_soon(self) -> bool:
+ if not self._expires_at:
+ return True
+ now = datetime.now(timezone.utc)
+ threshold = now.timestamp() + TOKEN_REFRESH_THRESHOLD
+ return self._expires_at.timestamp() <= threshold
+
+ def is_token_expired(self) -> bool:
+ if not self._expires_at:
+ return True
+ return datetime.now(timezone.utc) >= self._expires_at
+
+ async def _refresh_token_kiro_desktop(self) -> None:
+ if not self._refresh_token:
+ raise ValueError("Refresh token is not set")
+
+ payload = {"refreshToken": self._refresh_token}
+ headers = {
+ "Content-Type": "application/json",
+ "User-Agent": f"KiroIDE-0.7.45-{self._fingerprint}",
+ }
+
+ async with httpx.AsyncClient(timeout=30) as client:
+ response = await client.post(self._refresh_url, json=payload, headers=headers)
+ response.raise_for_status()
+ data = response.json()
+
+ new_access_token = data.get("accessToken")
+ new_refresh_token = data.get("refreshToken")
+ expires_in = data.get("expiresIn", 3600)
+ new_profile_arn = data.get("profileArn")
+
+ if not new_access_token:
+ raise ValueError(f"Refresh response missing accessToken: {data}")
+
+ self._access_token = new_access_token
+ if new_refresh_token:
+ self._refresh_token = new_refresh_token
+ if new_profile_arn:
+ self._profile_arn = new_profile_arn
+
+ self._expires_at = datetime.now(timezone.utc) + timedelta(
+ seconds=max(60, expires_in - 60)
+ )
+ self._save_credentials()
+
+ def _save_credentials(self) -> None:
+ """Save credentials to the appropriate storage (JSON file or SQLite)."""
+ if self._json_creds_file:
+ self._save_credentials_to_json_file()
+ elif self._sqlite_db:
+ self._save_credentials_to_sqlite()
+
+ async def _do_aws_sso_oidc_refresh(self) -> None:
+ if not self._refresh_token:
+ raise ValueError("Refresh token is not set")
+ if not self._client_id:
+ raise ValueError("Client ID is not set (required for AWS SSO OIDC)")
+ if not self._client_secret:
+ raise ValueError("Client secret is not set (required for AWS SSO OIDC)")
+
+ sso_region = self._sso_region or self._region
+ url = get_aws_sso_oidc_url(sso_region)
+ payload = {
+ "grantType": "refresh_token",
+ "clientId": self._client_id,
+ "clientSecret": self._client_secret,
+ "refreshToken": self._refresh_token,
+ }
+
+ async with httpx.AsyncClient(timeout=30) as client:
+ response = await client.post(url, json=payload)
+ if response.status_code != 200:
+ lib_logger.error(
+ f"AWS SSO OIDC refresh failed: status={response.status_code}, body={response.text}"
+ )
+ response.raise_for_status()
+ result = response.json()
+
+ new_access_token = result.get("accessToken")
+ new_refresh_token = result.get("refreshToken")
+ expires_in = result.get("expiresIn", 3600)
+ if not new_access_token:
+ raise ValueError(f"AWS SSO OIDC response missing accessToken: {result}")
+
+ self._access_token = new_access_token
+ if new_refresh_token:
+ self._refresh_token = new_refresh_token
+ self._expires_at = datetime.now(timezone.utc) + timedelta(
+ seconds=max(60, expires_in - 60)
+ )
+
+ self._save_credentials()
+
+ async def _refresh_token_aws_sso_oidc(self) -> None:
+ try:
+ await self._do_aws_sso_oidc_refresh()
+ except httpx.HTTPStatusError as exc:
+ if exc.response.status_code == 400:
+ # Reload credentials and retry
+ if self._json_creds_file:
+ lib_logger.warning(
+ "AWS SSO refresh failed with 400. Reloading JSON file and retrying."
+ )
+ self._load_credentials_from_json_file(self._json_creds_file)
+ await self._do_aws_sso_oidc_refresh()
+ elif self._sqlite_db:
+ lib_logger.warning(
+ "AWS SSO refresh failed with 400. Reloading SQLite and retrying."
+ )
+ self._load_credentials_from_sqlite(self._sqlite_db)
+ await self._do_aws_sso_oidc_refresh()
+ else:
+ raise
+ else:
+ raise
+
+ async def _refresh_token_request(self) -> None:
+ if self._auth_type == AuthType.AWS_SSO_OIDC:
+ await self._refresh_token_aws_sso_oidc()
+ else:
+ await self._refresh_token_kiro_desktop()
+
+ async def get_access_token(self) -> str:
+ async with self._lock:
+ if self._access_token and not self.is_token_expiring_soon():
+ return self._access_token
+
+ # Try reloading from file in case another process refreshed the token
+ if self.is_token_expiring_soon():
+ if self._json_creds_file:
+ self._load_credentials_from_json_file(self._json_creds_file)
+ elif self._sqlite_db:
+ self._load_credentials_from_sqlite(self._sqlite_db)
+ if self._access_token and not self.is_token_expiring_soon():
+ return self._access_token
+
+ try:
+ await self._refresh_token_request()
+ except httpx.HTTPStatusError as exc:
+ if self._access_token and not self.is_token_expired():
+ lib_logger.warning(
+ f"Refresh failed but access token still valid: {exc}"
+ )
+ return self._access_token
+ raise
+
+ if not self._access_token:
+ raise ValueError("Failed to obtain access token")
+ return self._access_token
+
+ async def force_refresh(self) -> None:
+ async with self._lock:
+ await self._refresh_token_request()
diff --git a/src/rotator_library/providers/kiro_cli_provider.py b/src/rotator_library/providers/kiro_cli_provider.py
new file mode 100644
index 00000000..d238ec27
--- /dev/null
+++ b/src/rotator_library/providers/kiro_cli_provider.py
@@ -0,0 +1,213 @@
+# SPDX-License-Identifier: LGPL-3.0-only
+# Copyright (c) 2026 Mirrowel
+
+import logging
+import os
+from typing import Any, AsyncGenerator, Dict, List, Optional, Union
+
+import httpx
+import litellm
+from litellm.exceptions import RateLimitError
+
+from .provider_interface import ProviderInterface
+from .kiro_auth_base import AuthType, KiroAuthManager
+from .utilities.kiro_converters import (
+ build_kiro_payload,
+ convert_openai_messages_to_unified,
+ convert_openai_tools_to_unified,
+)
+from .utilities.kiro_http_client import KiroHttpClient
+from .utilities.kiro_streaming import (
+ collect_stream_response,
+ stream_kiro_to_openai_chunks,
+)
+from .utilities.kiro_utils import generate_conversation_id, get_kiro_headers
+from ..model_definitions import ModelDefinitions
+
+
+lib_logger = logging.getLogger("rotator_library")
+
+
+class KiroCliProvider(ProviderInterface):
+ skip_cost_calculation = True
+ provider_env_name: str = "kiro_cli"
+
+ def __init__(self) -> None:
+ self.model_definitions = ModelDefinitions()
+ self._auth_managers: Dict[str, KiroAuthManager] = {}
+
+ def _get_auth_manager(self, credential_identifier: str) -> KiroAuthManager:
+ if credential_identifier not in self._auth_managers:
+ region = os.getenv("KIRO_REGION", "us-east-1")
+ refresh_token = os.getenv("KIRO_REFRESH_TOKEN") or os.getenv("REFRESH_TOKEN", "")
+ profile_arn = os.getenv("PROFILE_ARN") or os.getenv("KIRO_PROFILE_ARN")
+
+ # Determine credential type based on prefix
+ if credential_identifier.startswith("env:token:"):
+ # Direct refresh token from environment variable
+ if not refresh_token:
+ raise ValueError(
+ "KIRO_REFRESH_TOKEN or REFRESH_TOKEN environment variable is required"
+ )
+ auth_manager = KiroAuthManager(
+ refresh_token=refresh_token,
+ profile_arn=profile_arn,
+ region=region,
+ )
+ lib_logger.debug("Created KiroAuthManager with env refresh token")
+ elif credential_identifier.startswith("json:"):
+ # JSON credentials file
+ json_path = credential_identifier[5:] # Remove "json:" prefix
+ auth_manager = KiroAuthManager(
+ refresh_token=refresh_token or None,
+ profile_arn=profile_arn,
+ region=region,
+ json_creds_file=json_path,
+ )
+ lib_logger.debug(f"Created KiroAuthManager with JSON file: {json_path}")
+ else:
+ # SQLite database
+ auth_manager = KiroAuthManager(
+ refresh_token=refresh_token or None,
+ profile_arn=profile_arn,
+ region=region,
+ sqlite_db=credential_identifier,
+ )
+ lib_logger.debug(f"Created KiroAuthManager with SQLite DB: {credential_identifier}")
+
+ self._auth_managers[credential_identifier] = auth_manager
+ return self._auth_managers[credential_identifier]
+
+ def _resolve_model_id(self, model: str) -> str:
+ model_name = model.split("/")[-1] if "/" in model else model
+ model_id = self.model_definitions.get_model_id("kiro_cli", model_name)
+ return model_id or model_name
+
+ async def initialize_token(self, credential_identifier: str) -> None:
+ auth_manager = self._get_auth_manager(credential_identifier)
+ await auth_manager.get_access_token()
+
+ async def get_auth_header(self, credential_identifier: str) -> Dict[str, str]:
+ auth_manager = self._get_auth_manager(credential_identifier)
+ token = await auth_manager.get_access_token()
+ return {"Authorization": f"Bearer {token}"}
+
+ async def get_models(self, api_key: str, client: httpx.AsyncClient) -> List[str]:
+ models: List[str] = []
+ env_models = self.model_definitions.get_all_provider_models("kiro_cli")
+ env_ids = set()
+ if env_models:
+ for model in env_models:
+ model_name = model.split("/")[-1] if "/" in model else model
+ model_id = self.model_definitions.get_model_id("kiro_cli", model_name)
+ if model_id:
+ env_ids.add(model_id)
+ models.append(model)
+
+ if os.getenv("KIRO_CLI_ENABLE_DYNAMIC_MODELS", "true").lower() in (
+ "true",
+ "1",
+ "yes",
+ ):
+ try:
+ auth_manager = self._get_auth_manager(api_key)
+ token = await auth_manager.get_access_token()
+ headers = get_kiro_headers(auth_manager, token)
+ params = {"origin": "AI_EDITOR"}
+ if auth_manager.auth_type == AuthType.KIRO_DESKTOP and auth_manager.profile_arn:
+ params["profileArn"] = auth_manager.profile_arn
+
+ url = f"{auth_manager.q_host}/ListAvailableModels"
+ response = await client.get(url, headers=headers, params=params, timeout=30)
+ if response.status_code == 200:
+ data = response.json()
+ for model_info in data.get("models", []):
+ model_id = model_info.get("modelId") or model_info.get("model_id")
+ if model_id and model_id not in env_ids:
+ models.append(f"kiro_cli/{model_id}")
+ env_ids.add(model_id)
+ except Exception as exc:
+ lib_logger.debug(f"Dynamic model discovery failed for kiro_cli: {exc}")
+
+ return models
+
+ def has_custom_logic(self) -> bool:
+ return True
+
+ async def _build_payload(
+ self, model: str, messages: List[Dict[str, Any]], tools: Optional[List[Dict[str, Any]]]
+ ) -> Dict[str, Any]:
+ system_prompt, unified_messages = convert_openai_messages_to_unified(messages)
+ unified_tools = convert_openai_tools_to_unified(tools)
+ conversation_id = generate_conversation_id(messages)
+ model_id = self._resolve_model_id(model)
+ return build_kiro_payload(
+ messages=unified_messages,
+ system_prompt=system_prompt,
+ model_id=model_id,
+ tools=unified_tools,
+ conversation_id=conversation_id,
+ profile_arn="",
+ inject_thinking=True,
+ ).payload
+
+ async def acompletion(
+ self, client: httpx.AsyncClient, **kwargs
+ ) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]:
+ model = kwargs["model"]
+ credential_path = kwargs.pop("credential_identifier")
+ messages = kwargs.get("messages", [])
+ tools = kwargs.get("tools")
+ stream = kwargs.get("stream", False)
+
+ auth_manager = self._get_auth_manager(credential_path)
+ profile_arn = ""
+ if auth_manager.auth_type == AuthType.KIRO_DESKTOP and auth_manager.profile_arn:
+ profile_arn = auth_manager.profile_arn
+
+ payload = await self._build_payload(model, messages, tools)
+ if profile_arn:
+ payload["profileArn"] = profile_arn
+
+ url = f"{auth_manager.api_host}/generateAssistantResponse"
+
+ if stream:
+ http_client = KiroHttpClient(auth_manager, shared_client=None)
+ else:
+ http_client = KiroHttpClient(auth_manager, shared_client=client)
+
+ response = await http_client.request_with_retry(
+ "POST", url, payload, stream=True
+ )
+ if response.status_code != 200:
+ error_body = await response.aread()
+ if response.status_code == 429:
+ raise RateLimitError(
+ message=f"Kiro rate limit exceeded: {error_body!r}",
+ llm_provider="kiro_cli",
+ model=model,
+ response=response,
+ )
+ raise httpx.HTTPStatusError(
+ f"Kiro API error: {response.status_code} {error_body!r}",
+ request=response.request,
+ response=response,
+ )
+
+ if stream:
+ async def stream_handler():
+ try:
+ async for chunk in stream_kiro_to_openai_chunks(
+ response, model
+ ):
+ yield litellm.ModelResponse(**chunk)
+ finally:
+ await http_client.close()
+
+ return stream_handler()
+
+ try:
+ openai_response = await collect_stream_response(response, model)
+ return litellm.ModelResponse(**openai_response)
+ finally:
+ await http_client.close()
diff --git a/src/rotator_library/providers/openai_compatible_provider.py b/src/rotator_library/providers/openai_compatible_provider.py
index b04c91f0..62bd0dda 100644
--- a/src/rotator_library/providers/openai_compatible_provider.py
+++ b/src/rotator_library/providers/openai_compatible_provider.py
@@ -26,7 +26,7 @@ class OpenAICompatibleProvider(ProviderInterface):
_API_KEY - The API key (optional for some providers)
Example:
- MYSERVER_API_BASE=http://localhost:8000/v1
+ MYSERVER_API_BASE=http://localhost:7777/v1
MYSERVER_API_KEY=sk-xxx
Note: This is only used for providers NOT in the known LiteLLM providers list.
diff --git a/src/rotator_library/providers/utilities/kiro_converters.py b/src/rotator_library/providers/utilities/kiro_converters.py
new file mode 100644
index 00000000..ae070acb
--- /dev/null
+++ b/src/rotator_library/providers/utilities/kiro_converters.py
@@ -0,0 +1,778 @@
+# SPDX-License-Identifier: LGPL-3.0-only
+# Copyright (c) 2026 Mirrowel
+
+import json
+import logging
+from dataclasses import dataclass
+from typing import Any, Dict, List, Optional, Tuple
+
+from .kiro_utils import (
+ FAKE_REASONING_ENABLED,
+ FAKE_REASONING_MAX_TOKENS,
+ TOOL_DESCRIPTION_MAX_LENGTH,
+)
+
+
+lib_logger = logging.getLogger("rotator_library")
+
+
+@dataclass
+class UnifiedMessage:
+ role: str
+ content: Any = ""
+ tool_calls: Optional[List[Dict[str, Any]]] = None
+ tool_results: Optional[List[Dict[str, Any]]] = None
+ images: Optional[List[Dict[str, Any]]] = None
+
+
+@dataclass
+class UnifiedTool:
+ name: str
+ description: Optional[str] = None
+ input_schema: Optional[Dict[str, Any]] = None
+
+
+@dataclass
+class KiroPayloadResult:
+ payload: Dict[str, Any]
+ tool_documentation: str = ""
+
+
+def extract_text_content(content: Any) -> str:
+ if content is None:
+ return ""
+ if isinstance(content, str):
+ return content
+ if isinstance(content, list):
+ text_parts = []
+ for item in content:
+ if isinstance(item, dict):
+ if item.get("type") in ("image", "image_url"):
+ continue
+ if item.get("type") == "text":
+ text_parts.append(item.get("text", ""))
+ elif "text" in item:
+ text_parts.append(item.get("text", ""))
+ elif isinstance(item, str):
+ text_parts.append(item)
+ return "".join(text_parts)
+ return str(content)
+
+
+def extract_images_from_content(content: Any) -> List[Dict[str, Any]]:
+ images: List[Dict[str, Any]] = []
+ if not isinstance(content, list):
+ return images
+
+ for item in content:
+ item_type = item.get("type") if isinstance(item, dict) else None
+ if item_type == "image_url":
+ image_url_obj = item.get("image_url", {}) if isinstance(item, dict) else {}
+ url = image_url_obj.get("url", "") if isinstance(image_url_obj, dict) else ""
+ if url.startswith("data:"):
+ try:
+ header, data = url.split(",", 1)
+ media_part = header.split(";")[0]
+ media_type = media_part.replace("data:", "")
+ if data:
+ images.append({"media_type": media_type, "data": data})
+ except (ValueError, IndexError) as exc:
+ lib_logger.warning(f"Failed to parse image data URL: {exc}")
+ elif item_type == "image":
+ source = item.get("source", {}) if isinstance(item, dict) else {}
+ if isinstance(source, dict):
+ if source.get("type") == "base64":
+ media_type = source.get("media_type", "image/jpeg")
+ data = source.get("data", "")
+ if data:
+ images.append({"media_type": media_type, "data": data})
+ return images
+
+
+def sanitize_json_schema(schema: Optional[Dict[str, Any]]) -> Dict[str, Any]:
+ if not schema:
+ return {}
+ result: Dict[str, Any] = {}
+ for key, value in schema.items():
+ if key == "required" and isinstance(value, list) and len(value) == 0:
+ continue
+ if key == "additionalProperties":
+ continue
+ if key == "properties" and isinstance(value, dict):
+ result[key] = {
+ prop_name: sanitize_json_schema(prop_value)
+ if isinstance(prop_value, dict)
+ else prop_value
+ for prop_name, prop_value in value.items()
+ }
+ elif isinstance(value, dict):
+ result[key] = sanitize_json_schema(value)
+ elif isinstance(value, list):
+ result[key] = [
+ sanitize_json_schema(item) if isinstance(item, dict) else item
+ for item in value
+ ]
+ else:
+ result[key] = value
+ return result
+
+
+def process_tools_with_long_descriptions(
+ tools: Optional[List[UnifiedTool]],
+) -> Tuple[Optional[List[UnifiedTool]], str]:
+ if not tools:
+ return None, ""
+ if TOOL_DESCRIPTION_MAX_LENGTH <= 0:
+ return tools, ""
+
+ tool_documentation_parts = []
+ processed_tools = []
+
+ for tool in tools:
+ description = tool.description or ""
+ if len(description) <= TOOL_DESCRIPTION_MAX_LENGTH:
+ processed_tools.append(tool)
+ continue
+
+ tool_documentation_parts.append(f"## Tool: {tool.name}\n\n{description}")
+ reference_description = (
+ f"[Full documentation in system prompt under '## Tool: {tool.name}']"
+ )
+ processed_tools.append(
+ UnifiedTool(
+ name=tool.name,
+ description=reference_description,
+ input_schema=tool.input_schema,
+ )
+ )
+
+ tool_documentation = ""
+ if tool_documentation_parts:
+ tool_documentation = (
+ "\n\n---\n"
+ "# Tool Documentation\n"
+ "The following tools have detailed documentation that couldn't fit in the tool definition.\n\n"
+ + "\n\n---\n\n".join(tool_documentation_parts)
+ )
+
+ return processed_tools if processed_tools else None, tool_documentation
+
+
+def validate_tool_names(tools: Optional[List[UnifiedTool]]) -> None:
+ if not tools:
+ return
+ problematic = []
+ for tool in tools:
+ if len(tool.name) > 64:
+ problematic.append((tool.name, len(tool.name)))
+ if problematic:
+ tool_list = "\n".join(
+ [f" - '{name}' ({length} characters)" for name, length in problematic]
+ )
+ raise ValueError(
+ "Tool name(s) exceed Kiro API limit of 64 characters:\n"
+ f"{tool_list}\n\n"
+ "Solution: Use shorter tool names (max 64 characters)."
+ )
+
+
+def convert_tools_to_kiro_format(
+ tools: Optional[List[UnifiedTool]],
+) -> List[Dict[str, Any]]:
+ if not tools:
+ return []
+ kiro_tools = []
+ for tool in tools:
+ sanitized_params = sanitize_json_schema(tool.input_schema)
+ description = tool.description or ""
+ if not description.strip():
+ description = f"Tool: {tool.name}"
+ kiro_tools.append(
+ {
+ "toolSpecification": {
+ "name": tool.name,
+ "description": description,
+ "inputSchema": {"json": sanitized_params},
+ }
+ }
+ )
+ return kiro_tools
+
+
+def convert_images_to_kiro_format(
+ images: Optional[List[Dict[str, Any]]],
+) -> List[Dict[str, Any]]:
+ if not images:
+ return []
+ kiro_images = []
+ for img in images:
+ media_type = img.get("media_type", "image/jpeg")
+ data = img.get("data", "")
+ if not data:
+ continue
+ if data.startswith("data:"):
+ try:
+ header, actual_data = data.split(",", 1)
+ media_part = header.split(";")[0]
+ extracted_media_type = media_part.replace("data:", "")
+ if extracted_media_type:
+ media_type = extracted_media_type
+ data = actual_data
+ except (ValueError, IndexError):
+ pass
+ format_str = media_type.split("/")[-1] if "/" in media_type else media_type
+ kiro_images.append({"format": format_str, "source": {"bytes": data}})
+ return kiro_images
+
+
+def convert_tool_results_to_kiro_format(
+ tool_results: List[Dict[str, Any]],
+) -> List[Dict[str, Any]]:
+ kiro_results = []
+ for tr in tool_results:
+ content = tr.get("content", "")
+ content_text = content if isinstance(content, str) else extract_text_content(content)
+ if not content_text:
+ content_text = "(empty result)"
+ kiro_results.append(
+ {
+ "content": [{"text": content_text}],
+ "status": "success",
+ "toolUseId": tr.get("tool_use_id", ""),
+ }
+ )
+ return kiro_results
+
+
+def extract_tool_results_from_content(content: Any) -> List[Dict[str, Any]]:
+ tool_results = []
+ if isinstance(content, list):
+ for item in content:
+ if isinstance(item, dict) and item.get("type") == "tool_result":
+ tool_results.append(
+ {
+ "content": [
+ {
+ "text": extract_text_content(item.get("content", ""))
+ or "(empty result)"
+ }
+ ],
+ "status": "success",
+ "toolUseId": item.get("tool_use_id", ""),
+ }
+ )
+ return tool_results
+
+
+def extract_tool_uses_from_message(
+ content: Any, tool_calls: Optional[List[Dict[str, Any]]] = None
+) -> List[Dict[str, Any]]:
+ tool_uses = []
+ if tool_calls:
+ for tc in tool_calls:
+ func = tc.get("function", {})
+ arguments = func.get("arguments", "{}")
+ if isinstance(arguments, str):
+ try:
+ input_data = json.loads(arguments) if arguments else {}
+ except json.JSONDecodeError:
+ input_data = {}
+ else:
+ input_data = arguments or {}
+ tool_uses.append(
+ {
+ "name": func.get("name", ""),
+ "input": input_data,
+ "toolUseId": tc.get("id", ""),
+ }
+ )
+ if isinstance(content, list):
+ for item in content:
+ if isinstance(item, dict) and item.get("type") == "tool_use":
+ tool_uses.append(
+ {
+ "name": item.get("name", ""),
+ "input": item.get("input", {}),
+ "toolUseId": item.get("id", ""),
+ }
+ )
+ return tool_uses
+
+
+def tool_calls_to_text(tool_calls: List[Dict[str, Any]]) -> str:
+ if not tool_calls:
+ return ""
+ parts = []
+ for tc in tool_calls:
+ func = tc.get("function", {})
+ name = func.get("name", "unknown")
+ arguments = func.get("arguments", "{}")
+ tool_id = tc.get("id", "")
+ if tool_id:
+ parts.append(f"[Tool: {name} ({tool_id})]\n{arguments}")
+ else:
+ parts.append(f"[Tool: {name}]\n{arguments}")
+ return "\n\n".join(parts)
+
+
+def tool_results_to_text(tool_results: List[Dict[str, Any]]) -> str:
+ if not tool_results:
+ return ""
+ parts = []
+ for tr in tool_results:
+ content = tr.get("content", "")
+ tool_use_id = tr.get("tool_use_id", "")
+ content_text = content if isinstance(content, str) else extract_text_content(content)
+ if not content_text:
+ content_text = "(empty result)"
+ if tool_use_id:
+ parts.append(f"[Tool Result ({tool_use_id})]\n{content_text}")
+ else:
+ parts.append(f"[Tool Result]\n{content_text}")
+ return "\n\n".join(parts)
+
+
+def strip_all_tool_content(
+ messages: List[UnifiedMessage],
+) -> Tuple[List[UnifiedMessage], bool]:
+ if not messages:
+ return [], False
+ result = []
+ total_tool_calls_stripped = 0
+ total_tool_results_stripped = 0
+
+ for msg in messages:
+ has_tool_calls = bool(msg.tool_calls)
+ has_tool_results = bool(msg.tool_results)
+ if has_tool_calls or has_tool_results:
+ if has_tool_calls:
+ total_tool_calls_stripped += len(msg.tool_calls)
+ if has_tool_results:
+ total_tool_results_stripped += len(msg.tool_results)
+ existing_content = extract_text_content(msg.content)
+ content_parts = []
+ if existing_content:
+ content_parts.append(existing_content)
+ if has_tool_calls:
+ tool_text = tool_calls_to_text(msg.tool_calls or [])
+ if tool_text:
+ content_parts.append(tool_text)
+ if has_tool_results:
+ result_text = tool_results_to_text(msg.tool_results or [])
+ if result_text:
+ content_parts.append(result_text)
+ content = "\n\n".join(content_parts) if content_parts else "(empty)"
+ result.append(
+ UnifiedMessage(
+ role=msg.role,
+ content=content,
+ tool_calls=None,
+ tool_results=None,
+ images=msg.images,
+ )
+ )
+ else:
+ result.append(msg)
+
+ had_tool_content = total_tool_calls_stripped > 0 or total_tool_results_stripped > 0
+ return result, had_tool_content
+
+
+def ensure_assistant_before_tool_results(
+ messages: List[UnifiedMessage],
+) -> Tuple[List[UnifiedMessage], bool]:
+ if not messages:
+ return [], False
+ result = []
+ converted_any_tool_results = False
+
+ for msg in messages:
+ if msg.tool_results:
+ has_preceding_assistant = (
+ result and result[-1].role == "assistant" and result[-1].tool_calls
+ )
+ if not has_preceding_assistant:
+ tool_results_text = tool_results_to_text(msg.tool_results)
+ original_content = extract_text_content(msg.content) or ""
+ if original_content and tool_results_text:
+ new_content = f"{original_content}\n\n{tool_results_text}"
+ elif tool_results_text:
+ new_content = tool_results_text
+ else:
+ new_content = original_content
+ result.append(
+ UnifiedMessage(
+ role=msg.role,
+ content=new_content,
+ tool_calls=msg.tool_calls,
+ tool_results=None,
+ images=msg.images,
+ )
+ )
+ converted_any_tool_results = True
+ continue
+ result.append(msg)
+
+ return result, converted_any_tool_results
+
+
+def merge_adjacent_messages(messages: List[UnifiedMessage]) -> List[UnifiedMessage]:
+ if not messages:
+ return []
+ merged = []
+ for msg in messages:
+ if not merged:
+ merged.append(msg)
+ continue
+ last = merged[-1]
+ if msg.role == last.role:
+ last_text = extract_text_content(last.content)
+ current_text = extract_text_content(msg.content)
+ last.content = f"{last_text}\n{current_text}"
+ if msg.role == "assistant" and msg.tool_calls:
+ last.tool_calls = (last.tool_calls or []) + list(msg.tool_calls)
+ if msg.role == "user" and msg.tool_results:
+ last.tool_results = (last.tool_results or []) + list(msg.tool_results)
+ else:
+ merged.append(msg)
+ return merged
+
+
+def ensure_first_message_is_user(messages: List[UnifiedMessage]) -> List[UnifiedMessage]:
+ if not messages:
+ return messages
+ if messages[0].role != "user":
+ return [UnifiedMessage(role="user", content=".")] + messages
+ return messages
+
+
+def build_kiro_history(
+ messages: List[UnifiedMessage], model_id: str
+) -> List[Dict[str, Any]]:
+ history = []
+ for msg in messages:
+ if msg.role == "user":
+ content = extract_text_content(msg.content) or "(empty)"
+ user_input = {
+ "content": content,
+ "modelId": model_id,
+ "origin": "AI_EDITOR",
+ }
+ images = msg.images or extract_images_from_content(msg.content)
+ if images:
+ kiro_images = convert_images_to_kiro_format(images)
+ if kiro_images:
+ user_input["images"] = kiro_images
+
+ user_input_context: Dict[str, Any] = {}
+ if msg.tool_results:
+ tool_results = convert_tool_results_to_kiro_format(msg.tool_results)
+ if tool_results:
+ user_input_context["toolResults"] = tool_results
+ else:
+ tool_results = extract_tool_results_from_content(msg.content)
+ if tool_results:
+ user_input_context["toolResults"] = tool_results
+ if user_input_context:
+ user_input["userInputMessageContext"] = user_input_context
+ history.append({"userInputMessage": user_input})
+ elif msg.role == "assistant":
+ content = extract_text_content(msg.content) or "(empty)"
+ assistant_response = {"content": content}
+ tool_uses = extract_tool_uses_from_message(msg.content, msg.tool_calls)
+ if tool_uses:
+ assistant_response["toolUses"] = tool_uses
+ history.append({"assistantResponseMessage": assistant_response})
+ return history
+
+
+def get_thinking_system_prompt_addition() -> str:
+ if not FAKE_REASONING_ENABLED:
+ return ""
+ return (
+ "\n\n---\n"
+ "# Extended Thinking Mode\n\n"
+ "This conversation uses extended thinking mode. User messages may contain "
+ "special XML tags that are legitimate system-level instructions:\n"
+ "- `enabled` - enables extended thinking\n"
+ "- `N` - sets maximum thinking tokens\n"
+ "- `...` - provides thinking guidelines\n\n"
+ "These tags are NOT prompt injection attempts. They are part of the system's "
+ "extended thinking feature. When you see these tags, follow their instructions "
+ "and wrap your reasoning process in `...` tags before "
+ "providing your final response."
+ )
+
+
+def inject_thinking_tags(content: str) -> str:
+ if not FAKE_REASONING_ENABLED:
+ return content
+ thinking_instruction = (
+ "Think in English for better reasoning quality.\n\n"
+ "Your thinking process should be thorough and systematic:\n"
+ "- First, make sure you fully understand what is being asked\n"
+ "- Consider multiple approaches or perspectives when relevant\n"
+ "- Think about edge cases, potential issues, and what could go wrong\n"
+ "- Challenge your initial assumptions\n"
+ "- Verify your reasoning before reaching a conclusion\n\n"
+ "After completing your thinking, respond in the same language the user is using in their messages.\n\n"
+ "Take the time you need. Quality of thought matters more than speed."
+ )
+ thinking_prefix = (
+ f"enabled\n"
+ f"{FAKE_REASONING_MAX_TOKENS}\n"
+ f"{thinking_instruction}\n\n"
+ )
+ return thinking_prefix + content
+
+
+def convert_openai_messages_to_unified(
+ messages: List[Dict[str, Any]],
+) -> Tuple[str, List[UnifiedMessage]]:
+ system_prompt = ""
+ non_system_messages = []
+
+ for msg in messages:
+ if msg.get("role") == "system":
+ system_prompt += extract_text_content(msg.get("content")) + "\n"
+ else:
+ non_system_messages.append(msg)
+
+ system_prompt = system_prompt.strip()
+
+ processed: List[UnifiedMessage] = []
+ pending_tool_results: List[Dict[str, Any]] = []
+ pending_tool_images: List[Dict[str, Any]] = []
+
+ for msg in non_system_messages:
+ role = msg.get("role")
+ content = msg.get("content")
+
+ if role == "tool":
+ tool_result = {
+ "type": "tool_result",
+ "tool_use_id": msg.get("tool_call_id", "") or "",
+ "content": extract_text_content(content) or "(empty result)",
+ }
+ pending_tool_results.append(tool_result)
+ tool_images = extract_images_from_content(content)
+ if tool_images:
+ pending_tool_images.extend(tool_images)
+ continue
+
+ if pending_tool_results:
+ processed.append(
+ UnifiedMessage(
+ role="user",
+ content="",
+ tool_results=pending_tool_results.copy(),
+ images=pending_tool_images.copy() if pending_tool_images else None,
+ )
+ )
+ pending_tool_results.clear()
+ pending_tool_images.clear()
+
+ tool_calls = None
+ tool_results = None
+ images = None
+
+ if role == "assistant":
+ tool_calls = []
+ for tc in msg.get("tool_calls", []) or []:
+ func = tc.get("function", {}) if isinstance(tc, dict) else {}
+ tool_calls.append(
+ {
+ "id": tc.get("id", ""),
+ "type": "function",
+ "function": {
+ "name": func.get("name", ""),
+ "arguments": func.get("arguments", "{}"),
+ },
+ }
+ )
+ if not tool_calls:
+ tool_calls = None
+ elif role == "user":
+ tool_results = []
+ if isinstance(content, list):
+ for item in content:
+ if isinstance(item, dict) and item.get("type") == "tool_result":
+ tool_results.append(
+ {
+ "type": "tool_result",
+ "tool_use_id": item.get("tool_use_id", ""),
+ "content": extract_text_content(item.get("content", ""))
+ or "(empty result)",
+ }
+ )
+ if not tool_results:
+ tool_results = None
+ images = extract_images_from_content(content) or None
+
+ processed.append(
+ UnifiedMessage(
+ role=role,
+ content=extract_text_content(content),
+ tool_calls=tool_calls,
+ tool_results=tool_results,
+ images=images,
+ )
+ )
+
+ if pending_tool_results:
+ processed.append(
+ UnifiedMessage(
+ role="user",
+ content="",
+ tool_results=pending_tool_results.copy(),
+ images=pending_tool_images.copy() if pending_tool_images else None,
+ )
+ )
+
+ return system_prompt, processed
+
+
+def convert_openai_tools_to_unified(
+ tools: Optional[List[Dict[str, Any]]],
+) -> Optional[List[UnifiedTool]]:
+ if not tools:
+ return None
+ unified_tools: List[UnifiedTool] = []
+ for tool in tools:
+ if tool.get("type") != "function" and tool.get("name") is None:
+ continue
+ if tool.get("function"):
+ func = tool.get("function", {})
+ unified_tools.append(
+ UnifiedTool(
+ name=func.get("name", ""),
+ description=func.get("description"),
+ input_schema=func.get("parameters"),
+ )
+ )
+ elif tool.get("name"):
+ unified_tools.append(
+ UnifiedTool(
+ name=tool.get("name", ""),
+ description=tool.get("description"),
+ input_schema=tool.get("input_schema"),
+ )
+ )
+ return unified_tools if unified_tools else None
+
+
+def build_kiro_payload(
+ messages: List[UnifiedMessage],
+ system_prompt: str,
+ model_id: str,
+ tools: Optional[List[UnifiedTool]],
+ conversation_id: str,
+ profile_arn: str,
+ inject_thinking: bool = True,
+) -> KiroPayloadResult:
+ processed_tools, tool_documentation = process_tools_with_long_descriptions(tools)
+ validate_tool_names(processed_tools)
+
+ full_system_prompt = system_prompt
+ if tool_documentation:
+ full_system_prompt = (
+ full_system_prompt + tool_documentation
+ if full_system_prompt
+ else tool_documentation.strip()
+ )
+
+ thinking_system_addition = get_thinking_system_prompt_addition()
+ if thinking_system_addition:
+ full_system_prompt = (
+ full_system_prompt + thinking_system_addition
+ if full_system_prompt
+ else thinking_system_addition.strip()
+ )
+
+ if not tools:
+ messages_without_tools, had_tool_content = strip_all_tool_content(messages)
+ messages_with_assistants = messages_without_tools
+ converted_tool_results = had_tool_content
+ else:
+ messages_with_assistants, converted_tool_results = (
+ ensure_assistant_before_tool_results(messages)
+ )
+
+ merged_messages = merge_adjacent_messages(messages_with_assistants)
+ merged_messages = ensure_first_message_is_user(merged_messages)
+
+ if not merged_messages:
+ raise ValueError("No messages to send")
+
+ history_messages = merged_messages[:-1] if len(merged_messages) > 1 else []
+ if full_system_prompt and history_messages:
+ first_msg = history_messages[0]
+ if first_msg.role == "user":
+ original_content = extract_text_content(first_msg.content)
+ first_msg.content = f"{full_system_prompt}\n\n{original_content}"
+
+ history = build_kiro_history(history_messages, model_id)
+
+ current_message = merged_messages[-1]
+ current_content = extract_text_content(current_message.content)
+ if full_system_prompt and not history:
+ current_content = f"{full_system_prompt}\n\n{current_content}"
+
+ if current_message.role == "assistant":
+ history.append(
+ {
+ "assistantResponseMessage": {"content": current_content or "(empty)"}
+ }
+ )
+ current_content = "Continue"
+
+ if not current_content:
+ current_content = "Continue"
+
+ images = current_message.images or extract_images_from_content(
+ current_message.content
+ )
+ kiro_images = convert_images_to_kiro_format(images) if images else None
+
+ user_input_context: Dict[str, Any] = {}
+ kiro_tools = convert_tools_to_kiro_format(processed_tools)
+ if kiro_tools:
+ user_input_context["tools"] = kiro_tools
+
+ if current_message.tool_results:
+ kiro_tool_results = convert_tool_results_to_kiro_format(
+ current_message.tool_results
+ )
+ if kiro_tool_results:
+ user_input_context["toolResults"] = kiro_tool_results
+ else:
+ tool_results = extract_tool_results_from_content(current_message.content)
+ if tool_results:
+ user_input_context["toolResults"] = tool_results
+
+ if inject_thinking and current_message.role == "user" and not converted_tool_results:
+ current_content = inject_thinking_tags(current_content)
+
+ user_input_message = {
+ "content": current_content,
+ "modelId": model_id,
+ "origin": "AI_EDITOR",
+ }
+ if kiro_images:
+ user_input_message["images"] = kiro_images
+ if user_input_context:
+ user_input_message["userInputMessageContext"] = user_input_context
+
+ payload = {
+ "conversationState": {
+ "chatTriggerType": "MANUAL",
+ "conversationId": conversation_id,
+ "currentMessage": {"userInputMessage": user_input_message},
+ }
+ }
+ if history:
+ payload["conversationState"]["history"] = history
+ if profile_arn:
+ payload["profileArn"] = profile_arn
+
+ return KiroPayloadResult(payload=payload, tool_documentation=tool_documentation)
diff --git a/src/rotator_library/providers/utilities/kiro_http_client.py b/src/rotator_library/providers/utilities/kiro_http_client.py
new file mode 100644
index 00000000..b6550eb0
--- /dev/null
+++ b/src/rotator_library/providers/utilities/kiro_http_client.py
@@ -0,0 +1,119 @@
+# SPDX-License-Identifier: LGPL-3.0-only
+# Copyright (c) 2026 Mirrowel
+
+import asyncio
+import logging
+from typing import Optional
+
+import httpx
+
+from .kiro_utils import (
+ BASE_RETRY_DELAY,
+ FIRST_TOKEN_MAX_RETRIES,
+ MAX_RETRIES,
+ STREAMING_READ_TIMEOUT,
+ get_kiro_headers,
+)
+from ..kiro_auth_base import KiroAuthManager
+
+
+lib_logger = logging.getLogger("rotator_library")
+
+
+class KiroHttpClient:
+ def __init__(
+ self,
+ auth_manager: KiroAuthManager,
+ shared_client: Optional[httpx.AsyncClient] = None,
+ ) -> None:
+ self.auth_manager = auth_manager
+ self._shared_client = shared_client
+ self._owns_client = shared_client is None
+ self.client: Optional[httpx.AsyncClient] = shared_client
+
+ async def _get_client(self, stream: bool = False) -> httpx.AsyncClient:
+ if self._shared_client is not None:
+ return self._shared_client
+
+ if self.client is None or self.client.is_closed:
+ if stream:
+ timeout_config = httpx.Timeout(
+ connect=30.0,
+ read=STREAMING_READ_TIMEOUT,
+ write=30.0,
+ pool=30.0,
+ )
+ else:
+ timeout_config = httpx.Timeout(timeout=300.0)
+ self.client = httpx.AsyncClient(
+ timeout=timeout_config, follow_redirects=True
+ )
+ return self.client
+
+ async def close(self) -> None:
+ if not self._owns_client:
+ return
+ if self.client and not self.client.is_closed:
+ try:
+ await self.client.aclose()
+ except Exception as exc:
+ lib_logger.warning(f"Error closing HTTP client: {exc}")
+
+ async def request_with_retry(
+ self,
+ method: str,
+ url: str,
+ json_data: dict,
+ stream: bool = False,
+ ) -> httpx.Response:
+ max_retries = FIRST_TOKEN_MAX_RETRIES if stream else MAX_RETRIES
+ client = await self._get_client(stream=stream)
+ last_error: Optional[Exception] = None
+
+ for attempt in range(max_retries):
+ try:
+ token = await self.auth_manager.get_access_token()
+ headers = get_kiro_headers(self.auth_manager, token)
+
+ if stream:
+ headers["Connection"] = "close"
+ req = client.build_request(
+ method, url, json=json_data, headers=headers
+ )
+ response = await client.send(req, stream=True)
+ else:
+ response = await client.request(
+ method, url, json=json_data, headers=headers
+ )
+
+ if response.status_code == 200:
+ return response
+
+ if response.status_code == 403:
+ await self.auth_manager.force_refresh()
+ continue
+
+ if response.status_code in (429,) or 500 <= response.status_code < 600:
+ delay = BASE_RETRY_DELAY * (2**attempt)
+ await asyncio.sleep(delay)
+ continue
+
+ return response
+
+ except (httpx.TimeoutException, httpx.RequestError) as exc:
+ last_error = exc
+ if attempt < max_retries - 1:
+ delay = BASE_RETRY_DELAY * (2**attempt)
+ await asyncio.sleep(delay)
+ continue
+ break
+
+ if last_error:
+ raise last_error
+ raise RuntimeError("Kiro request failed after retries")
+
+ async def __aenter__(self) -> "KiroHttpClient":
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
+ await self.close()
diff --git a/src/rotator_library/providers/utilities/kiro_streaming.py b/src/rotator_library/providers/utilities/kiro_streaming.py
new file mode 100644
index 00000000..bbb5cba0
--- /dev/null
+++ b/src/rotator_library/providers/utilities/kiro_streaming.py
@@ -0,0 +1,744 @@
+# SPDX-License-Identifier: LGPL-3.0-only
+# Copyright (c) 2026 Mirrowel
+
+import asyncio
+import json
+import logging
+import time
+from dataclasses import dataclass, field
+from enum import IntEnum
+from typing import Any, AsyncGenerator, Dict, List, Optional
+
+import httpx
+
+from .kiro_utils import (
+ FAKE_REASONING_ENABLED,
+ FAKE_REASONING_HANDLING,
+ FAKE_REASONING_INITIAL_BUFFER_SIZE,
+ FAKE_REASONING_OPEN_TAGS,
+ FIRST_TOKEN_MAX_RETRIES,
+ FIRST_TOKEN_TIMEOUT,
+ generate_completion_id,
+ generate_tool_call_id,
+)
+
+
+lib_logger = logging.getLogger("rotator_library")
+
+
+def find_matching_brace(text: str, start_pos: int) -> int:
+ if start_pos >= len(text) or text[start_pos] != "{":
+ return -1
+ brace_count = 0
+ in_string = False
+ escape_next = False
+ for i in range(start_pos, len(text)):
+ char = text[i]
+ if escape_next:
+ escape_next = False
+ continue
+ if char == "\\" and in_string:
+ escape_next = True
+ continue
+ if char == '"' and not escape_next:
+ in_string = not in_string
+ continue
+ if not in_string:
+ if char == "{":
+ brace_count += 1
+ elif char == "}":
+ brace_count -= 1
+ if brace_count == 0:
+ return i
+ return -1
+
+
+def parse_bracket_tool_calls(response_text: str) -> List[Dict[str, Any]]:
+ if not response_text or "[Called" not in response_text:
+ return []
+ tool_calls = []
+ pattern = r"\[Called\s+(\w+)\s+with\s+args:\s*"
+ import re
+
+ for match in re.finditer(pattern, response_text, re.IGNORECASE):
+ func_name = match.group(1)
+ args_start = match.end()
+ json_start = response_text.find("{", args_start)
+ if json_start == -1:
+ continue
+ json_end = find_matching_brace(response_text, json_start)
+ if json_end == -1:
+ continue
+ json_str = response_text[json_start : json_end + 1]
+ try:
+ args = json.loads(json_str)
+ tool_calls.append(
+ {
+ "id": generate_tool_call_id(),
+ "type": "function",
+ "function": {
+ "name": func_name,
+ "arguments": json.dumps(args),
+ },
+ }
+ )
+ except json.JSONDecodeError:
+ lib_logger.warning(
+ f"Failed to parse tool call arguments: {json_str[:100]}"
+ )
+ return tool_calls
+
+
+def deduplicate_tool_calls(tool_calls: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ by_id: Dict[str, Dict[str, Any]] = {}
+ for tc in tool_calls:
+ tc_id = tc.get("id", "")
+ if not tc_id:
+ continue
+ existing = by_id.get(tc_id)
+ if existing is None:
+ by_id[tc_id] = tc
+ else:
+ existing_args = existing.get("function", {}).get("arguments", "{}")
+ current_args = tc.get("function", {}).get("arguments", "{}")
+ if current_args != "{}" and (
+ existing_args == "{}" or len(current_args) > len(existing_args)
+ ):
+ by_id[tc_id] = tc
+
+ result_with_id = list(by_id.values())
+ result_without_id = [tc for tc in tool_calls if not tc.get("id")]
+
+ seen = set()
+ unique = []
+ for tc in result_with_id + result_without_id:
+ func = tc.get("function") or {}
+ func_name = func.get("name") or ""
+ func_args = func.get("arguments") or "{}"
+ key = f"{func_name}-{func_args}"
+ if key not in seen:
+ seen.add(key)
+ unique.append(tc)
+ return unique
+
+
+class AwsEventStreamParser:
+ EVENT_PATTERNS = [
+ ('{"content":', "content"),
+ ('{"name":', "tool_start"),
+ ('{"input":', "tool_input"),
+ ('{"stop":', "tool_stop"),
+ ('{"usage":', "usage"),
+ ('{"contextUsagePercentage":', "context_usage"),
+ ]
+
+ def __init__(self) -> None:
+ self.buffer = ""
+ self.last_content: Optional[str] = None
+ self.current_tool_call: Optional[Dict[str, Any]] = None
+ self.tool_calls: List[Dict[str, Any]] = []
+
+ def feed(self, chunk: bytes) -> List[Dict[str, Any]]:
+ try:
+ self.buffer += chunk.decode("utf-8", errors="ignore")
+ except Exception:
+ return []
+
+ events = []
+ while True:
+ earliest_pos = -1
+ earliest_type = None
+ for pattern, event_type in self.EVENT_PATTERNS:
+ pos = self.buffer.find(pattern)
+ if pos != -1 and (earliest_pos == -1 or pos < earliest_pos):
+ earliest_pos = pos
+ earliest_type = event_type
+ if earliest_pos == -1:
+ break
+
+ json_end = find_matching_brace(self.buffer, earliest_pos)
+ if json_end == -1:
+ break
+
+ json_str = self.buffer[earliest_pos : json_end + 1]
+ self.buffer = self.buffer[json_end + 1 :]
+ try:
+ data = json.loads(json_str)
+ event = self._process_event(data, earliest_type)
+ if event:
+ events.append(event)
+ except json.JSONDecodeError:
+ lib_logger.warning(f"Failed to parse JSON: {json_str[:100]}")
+
+ return events
+
+ def _process_event(self, data: dict, event_type: str) -> Optional[Dict[str, Any]]:
+ if event_type == "content":
+ return self._process_content_event(data)
+ if event_type == "tool_start":
+ return self._process_tool_start_event(data)
+ if event_type == "tool_input":
+ return self._process_tool_input_event(data)
+ if event_type == "tool_stop":
+ return self._process_tool_stop_event(data)
+ if event_type == "usage":
+ return {"type": "usage", "data": data.get("usage", 0)}
+ if event_type == "context_usage":
+ return {"type": "context_usage", "data": data.get("contextUsagePercentage", 0)}
+ return None
+
+ def _process_content_event(self, data: dict) -> Optional[Dict[str, Any]]:
+ content = data.get("content", "")
+ if data.get("followupPrompt"):
+ return None
+ if content == self.last_content:
+ return None
+ self.last_content = content
+ return {"type": "content", "data": content}
+
+ def _process_tool_start_event(self, data: dict) -> Optional[Dict[str, Any]]:
+ if self.current_tool_call:
+ self._finalize_tool_call()
+
+ input_data = data.get("input", "")
+ if isinstance(input_data, dict):
+ input_str = json.dumps(input_data)
+ else:
+ input_str = str(input_data) if input_data else ""
+
+ self.current_tool_call = {
+ "id": data.get("toolUseId", generate_tool_call_id()),
+ "type": "function",
+ "function": {
+ "name": data.get("name", ""),
+ "arguments": input_str,
+ },
+ }
+
+ if data.get("stop"):
+ self._finalize_tool_call()
+ return None
+
+ def _process_tool_input_event(self, data: dict) -> Optional[Dict[str, Any]]:
+ if self.current_tool_call:
+ input_data = data.get("input", "")
+ if isinstance(input_data, dict):
+ input_str = json.dumps(input_data)
+ else:
+ input_str = str(input_data) if input_data else ""
+ self.current_tool_call["function"]["arguments"] += input_str
+ return None
+
+ def _process_tool_stop_event(self, data: dict) -> Optional[Dict[str, Any]]:
+ if self.current_tool_call and data.get("stop"):
+ self._finalize_tool_call()
+ return None
+
+ def _finalize_tool_call(self) -> None:
+ if not self.current_tool_call:
+ return
+ args = self.current_tool_call["function"].get("arguments", "")
+ if isinstance(args, str):
+ if args.strip():
+ try:
+ parsed = json.loads(args)
+ self.current_tool_call["function"]["arguments"] = json.dumps(parsed)
+ except json.JSONDecodeError:
+ self.current_tool_call["function"]["arguments"] = "{}"
+ else:
+ self.current_tool_call["function"]["arguments"] = "{}"
+ elif isinstance(args, dict):
+ self.current_tool_call["function"]["arguments"] = json.dumps(args)
+ else:
+ self.current_tool_call["function"]["arguments"] = "{}"
+ self.tool_calls.append(self.current_tool_call)
+ self.current_tool_call = None
+
+ def get_tool_calls(self) -> List[Dict[str, Any]]:
+ if self.current_tool_call:
+ self._finalize_tool_call()
+ return deduplicate_tool_calls(self.tool_calls)
+
+
+class ParserState(IntEnum):
+ PRE_CONTENT = 0
+ IN_THINKING = 1
+ STREAMING = 2
+
+
+@dataclass
+class ThinkingParseResult:
+ thinking_content: Optional[str] = None
+ regular_content: Optional[str] = None
+ is_first_thinking_chunk: bool = False
+ is_last_thinking_chunk: bool = False
+ state_changed: bool = False
+
+
+class ThinkingParser:
+ def __init__(
+ self,
+ handling_mode: Optional[str] = None,
+ open_tags: Optional[List[str]] = None,
+ initial_buffer_size: int = FAKE_REASONING_INITIAL_BUFFER_SIZE,
+ ) -> None:
+ self.handling_mode = handling_mode or FAKE_REASONING_HANDLING
+ self.open_tags = open_tags or FAKE_REASONING_OPEN_TAGS
+ self.initial_buffer_size = initial_buffer_size
+ self.max_tag_length = max(len(tag) for tag in self.open_tags) * 2
+
+ self.state = ParserState.PRE_CONTENT
+ self.initial_buffer = ""
+ self.thinking_buffer = ""
+ self.open_tag: Optional[str] = None
+ self.close_tag: Optional[str] = None
+ self.is_first_thinking_chunk = True
+ self._thinking_block_found = False
+
+ def feed(self, content: str) -> ThinkingParseResult:
+ result = ThinkingParseResult()
+ if not content:
+ return result
+
+ if self.state == ParserState.PRE_CONTENT:
+ result = self._handle_pre_content(content)
+
+ if self.state == ParserState.IN_THINKING and not result.state_changed:
+ result = self._handle_in_thinking(content)
+
+ if self.state == ParserState.STREAMING and not result.state_changed:
+ result.regular_content = content
+
+ return result
+
+ def _handle_pre_content(self, content: str) -> ThinkingParseResult:
+ result = ThinkingParseResult()
+ self.initial_buffer += content
+ stripped = self.initial_buffer.lstrip()
+
+ for tag in self.open_tags:
+ if stripped.startswith(tag):
+ self.state = ParserState.IN_THINKING
+ self.open_tag = tag
+ self.close_tag = f"{tag[1:]}"
+ self._thinking_block_found = True
+ result.state_changed = True
+
+ content_after_tag = stripped[len(tag) :]
+ self.thinking_buffer = content_after_tag
+ self.initial_buffer = ""
+
+ thinking_result = self._process_thinking_buffer()
+ if thinking_result.thinking_content:
+ result.thinking_content = thinking_result.thinking_content
+ result.is_first_thinking_chunk = (
+ thinking_result.is_first_thinking_chunk
+ )
+ if thinking_result.is_last_thinking_chunk:
+ result.is_last_thinking_chunk = True
+ if thinking_result.regular_content:
+ result.regular_content = thinking_result.regular_content
+ return result
+
+ for tag in self.open_tags:
+ if tag.startswith(stripped) and len(stripped) < len(tag):
+ return result
+
+ if len(self.initial_buffer) > self.initial_buffer_size:
+ self.state = ParserState.STREAMING
+ result.state_changed = True
+ result.regular_content = self.initial_buffer
+ self.initial_buffer = ""
+ return result
+
+ def _handle_in_thinking(self, content: str) -> ThinkingParseResult:
+ self.thinking_buffer += content
+ return self._process_thinking_buffer()
+
+ def _process_thinking_buffer(self) -> ThinkingParseResult:
+ result = ThinkingParseResult()
+ if not self.close_tag:
+ return result
+
+ if self.close_tag in self.thinking_buffer:
+ idx = self.thinking_buffer.find(self.close_tag)
+ thinking_content = self.thinking_buffer[:idx]
+ after_tag = self.thinking_buffer[idx + len(self.close_tag) :]
+ if thinking_content:
+ result.thinking_content = thinking_content
+ result.is_first_thinking_chunk = self.is_first_thinking_chunk
+ self.is_first_thinking_chunk = False
+ result.is_last_thinking_chunk = True
+ self.state = ParserState.STREAMING
+ result.state_changed = True
+ self.thinking_buffer = ""
+ if after_tag:
+ stripped_after = after_tag.lstrip()
+ if stripped_after:
+ result.regular_content = stripped_after
+ return result
+
+ if len(self.thinking_buffer) > self.max_tag_length:
+ send_part = self.thinking_buffer[: -self.max_tag_length]
+ self.thinking_buffer = self.thinking_buffer[-self.max_tag_length :]
+ result.thinking_content = send_part
+ result.is_first_thinking_chunk = self.is_first_thinking_chunk
+ self.is_first_thinking_chunk = False
+ return result
+
+ def finalize(self) -> ThinkingParseResult:
+ result = ThinkingParseResult()
+ if self.thinking_buffer:
+ if self.state == ParserState.IN_THINKING:
+ result.thinking_content = self.thinking_buffer
+ result.is_first_thinking_chunk = self.is_first_thinking_chunk
+ result.is_last_thinking_chunk = True
+ else:
+ result.regular_content = self.thinking_buffer
+ self.thinking_buffer = ""
+ if self.initial_buffer:
+ result.regular_content = (result.regular_content or "") + self.initial_buffer
+ self.initial_buffer = ""
+ return result
+
+ @property
+ def found_thinking_block(self) -> bool:
+ return self._thinking_block_found
+
+ def process_for_output(
+ self, thinking_content: Optional[str], is_first: bool, is_last: bool
+ ) -> Optional[str]:
+ if not thinking_content:
+ return None
+ if self.handling_mode == "remove":
+ return None
+ if self.handling_mode == "pass":
+ prefix = self.open_tag if is_first and self.open_tag else ""
+ suffix = self.close_tag if is_last and self.close_tag else ""
+ return f"{prefix}{thinking_content}{suffix}"
+ if self.handling_mode == "strip_tags":
+ return thinking_content
+ return thinking_content
+
+
+@dataclass
+class KiroEvent:
+ type: str
+ content: Optional[str] = None
+ thinking_content: Optional[str] = None
+ tool_use: Optional[Dict[str, Any]] = None
+ usage: Optional[Dict[str, Any]] = None
+ context_usage_percentage: Optional[float] = None
+ is_first_thinking_chunk: bool = False
+ is_last_thinking_chunk: bool = False
+
+
+@dataclass
+class StreamResult:
+ content: str = ""
+ thinking_content: str = ""
+ tool_calls: List[Dict[str, Any]] = field(default_factory=list)
+ usage: Optional[Dict[str, Any]] = None
+ context_usage_percentage: Optional[float] = None
+
+
+class FirstTokenTimeoutError(Exception):
+ pass
+
+
+async def parse_kiro_stream(
+ response: httpx.Response,
+ first_token_timeout: float = FIRST_TOKEN_TIMEOUT,
+ enable_thinking_parser: bool = True,
+) -> AsyncGenerator[KiroEvent, None]:
+ parser = AwsEventStreamParser()
+ thinking_parser: Optional[ThinkingParser] = None
+ if FAKE_REASONING_ENABLED and enable_thinking_parser:
+ thinking_parser = ThinkingParser(handling_mode=FAKE_REASONING_HANDLING)
+
+ byte_iterator = response.aiter_bytes()
+ try:
+ first_byte_chunk = await asyncio.wait_for(
+ byte_iterator.__anext__(), timeout=first_token_timeout
+ )
+ except asyncio.TimeoutError:
+ raise FirstTokenTimeoutError(
+ f"No response within {first_token_timeout} seconds"
+ )
+ except StopAsyncIteration:
+ return
+
+ async for event in _process_chunk(parser, first_byte_chunk, thinking_parser):
+ yield event
+
+ async for chunk in byte_iterator:
+ async for event in _process_chunk(parser, chunk, thinking_parser):
+ yield event
+
+ if thinking_parser:
+ final_result = thinking_parser.finalize()
+ if final_result.thinking_content:
+ processed_thinking = thinking_parser.process_for_output(
+ final_result.thinking_content,
+ final_result.is_first_thinking_chunk,
+ final_result.is_last_thinking_chunk,
+ )
+ if processed_thinking:
+ yield KiroEvent(
+ type="thinking",
+ thinking_content=processed_thinking,
+ is_first_thinking_chunk=final_result.is_first_thinking_chunk,
+ is_last_thinking_chunk=final_result.is_last_thinking_chunk,
+ )
+ if final_result.regular_content:
+ yield KiroEvent(type="content", content=final_result.regular_content)
+
+ for tc in parser.get_tool_calls():
+ yield KiroEvent(type="tool_use", tool_use=tc)
+
+
+async def _process_chunk(
+ parser: AwsEventStreamParser,
+ chunk: bytes,
+ thinking_parser: Optional[ThinkingParser],
+) -> AsyncGenerator[KiroEvent, None]:
+ events = parser.feed(chunk)
+ for event in events:
+ if event["type"] == "content":
+ content = event["data"]
+ if thinking_parser:
+ parse_result = thinking_parser.feed(content)
+ if parse_result.thinking_content:
+ processed_thinking = thinking_parser.process_for_output(
+ parse_result.thinking_content,
+ parse_result.is_first_thinking_chunk,
+ parse_result.is_last_thinking_chunk,
+ )
+ if processed_thinking:
+ yield KiroEvent(
+ type="thinking",
+ thinking_content=processed_thinking,
+ is_first_thinking_chunk=parse_result.is_first_thinking_chunk,
+ is_last_thinking_chunk=parse_result.is_last_thinking_chunk,
+ )
+ if parse_result.regular_content:
+ yield KiroEvent(type="content", content=parse_result.regular_content)
+ else:
+ yield KiroEvent(type="content", content=content)
+ elif event["type"] == "usage":
+ yield KiroEvent(type="usage", usage=event["data"])
+ elif event["type"] == "context_usage":
+ yield KiroEvent(
+ type="context_usage", context_usage_percentage=event["data"]
+ )
+
+
+async def collect_stream_to_result(
+ response: httpx.Response,
+ first_token_timeout: float = FIRST_TOKEN_TIMEOUT,
+ enable_thinking_parser: bool = True,
+) -> StreamResult:
+ result = StreamResult()
+ full_content_for_bracket_tools = ""
+ async for event in parse_kiro_stream(
+ response, first_token_timeout, enable_thinking_parser
+ ):
+ if event.type == "content" and event.content:
+ result.content += event.content
+ full_content_for_bracket_tools += event.content
+ elif event.type == "thinking" and event.thinking_content:
+ result.thinking_content += event.thinking_content
+ full_content_for_bracket_tools += event.thinking_content
+ elif event.type == "tool_use" and event.tool_use:
+ result.tool_calls.append(event.tool_use)
+ elif event.type == "usage" and event.usage:
+ result.usage = event.usage
+ elif event.type == "context_usage" and event.context_usage_percentage is not None:
+ result.context_usage_percentage = event.context_usage_percentage
+
+ bracket_tool_calls = parse_bracket_tool_calls(full_content_for_bracket_tools)
+ if bracket_tool_calls:
+ result.tool_calls = deduplicate_tool_calls(
+ result.tool_calls + bracket_tool_calls
+ )
+ return result
+
+
+async def stream_kiro_to_openai_chunks(
+ response: httpx.Response,
+ model: str,
+ first_token_timeout: float = FIRST_TOKEN_TIMEOUT,
+) -> AsyncGenerator[Dict[str, Any], None]:
+ completion_id = generate_completion_id()
+ created_time = int(time.time())
+ first_chunk = True
+ full_content = ""
+ full_thinking_content = ""
+ tool_calls_from_stream: List[Dict[str, Any]] = []
+ try:
+ async for event in parse_kiro_stream(response, first_token_timeout):
+ if event.type == "content" and event.content:
+ full_content += event.content
+ delta = {"content": event.content}
+ if first_chunk:
+ delta["role"] = "assistant"
+ first_chunk = False
+ yield {
+ "id": completion_id,
+ "object": "chat.completion.chunk",
+ "created": created_time,
+ "model": model,
+ "choices": [{"index": 0, "delta": delta, "finish_reason": None}],
+ }
+ elif event.type == "thinking" and event.thinking_content:
+ full_thinking_content += event.thinking_content
+ if FAKE_REASONING_HANDLING == "as_reasoning_content":
+ delta = {"reasoning_content": event.thinking_content}
+ elif FAKE_REASONING_HANDLING == "remove":
+ continue
+ else:
+ delta = {"content": event.thinking_content}
+ if first_chunk:
+ delta["role"] = "assistant"
+ first_chunk = False
+ yield {
+ "id": completion_id,
+ "object": "chat.completion.chunk",
+ "created": created_time,
+ "model": model,
+ "choices": [{"index": 0, "delta": delta, "finish_reason": None}],
+ }
+ elif event.type == "tool_use" and event.tool_use:
+ tool_calls_from_stream.append(event.tool_use)
+ finally:
+ try:
+ await response.aclose()
+ except Exception:
+ pass
+
+ bracket_tool_calls = parse_bracket_tool_calls(full_content)
+ all_tool_calls = deduplicate_tool_calls(tool_calls_from_stream + bracket_tool_calls)
+ if all_tool_calls:
+ indexed_tool_calls = []
+ for idx, tc in enumerate(all_tool_calls):
+ func = tc.get("function") or {}
+ indexed_tool_calls.append(
+ {
+ "index": idx,
+ "id": tc.get("id"),
+ "type": tc.get("type", "function"),
+ "function": {
+ "name": func.get("name") or "",
+ "arguments": func.get("arguments") or "{}",
+ },
+ }
+ )
+ yield {
+ "id": completion_id,
+ "object": "chat.completion.chunk",
+ "created": created_time,
+ "model": model,
+ "choices": [
+ {
+ "index": 0,
+ "delta": {"tool_calls": indexed_tool_calls},
+ "finish_reason": None,
+ }
+ ],
+ }
+
+ finish_reason = "tool_calls" if all_tool_calls else "stop"
+ yield {
+ "id": completion_id,
+ "object": "chat.completion.chunk",
+ "created": created_time,
+ "model": model,
+ "choices": [{"index": 0, "delta": {}, "finish_reason": finish_reason}],
+ "usage": {
+ "prompt_tokens": 0,
+ "completion_tokens": 1,
+ "total_tokens": 1,
+ },
+ }
+
+
+async def collect_stream_response(
+ response: httpx.Response,
+ model: str,
+ first_token_timeout: float = FIRST_TOKEN_TIMEOUT,
+) -> Dict[str, Any]:
+ result = await collect_stream_to_result(
+ response, first_token_timeout=first_token_timeout
+ )
+ message: Dict[str, Any] = {"role": "assistant", "content": result.content}
+ if result.thinking_content and FAKE_REASONING_HANDLING == "as_reasoning_content":
+ message["reasoning_content"] = result.thinking_content
+ if result.tool_calls:
+ cleaned_tool_calls = []
+ for tc in result.tool_calls:
+ func = tc.get("function") or {}
+ cleaned_tool_calls.append(
+ {
+ "id": tc.get("id"),
+ "type": tc.get("type", "function"),
+ "function": {
+ "name": func.get("name", ""),
+ "arguments": func.get("arguments", "{}"),
+ },
+ }
+ )
+ message["tool_calls"] = cleaned_tool_calls
+
+ finish_reason = "tool_calls" if result.tool_calls else "stop"
+ return {
+ "id": generate_completion_id(),
+ "object": "chat.completion",
+ "created": int(time.time()),
+ "model": model,
+ "choices": [
+ {
+ "index": 0,
+ "message": message,
+ "finish_reason": finish_reason,
+ }
+ ],
+ "usage": {
+ "prompt_tokens": 0,
+ "completion_tokens": 1,
+ "total_tokens": 1,
+ },
+ }
+
+
+async def stream_with_first_token_retry(
+ make_request,
+ stream_processor,
+ max_retries: int = FIRST_TOKEN_MAX_RETRIES,
+ first_token_timeout: float = FIRST_TOKEN_TIMEOUT,
+):
+ last_error: Optional[Exception] = None
+ for attempt in range(max_retries):
+ response: Optional[httpx.Response] = None
+ try:
+ response = await make_request()
+ if response.status_code != 200:
+ error_text = await response.aread()
+ raise RuntimeError(
+ f"Upstream API error ({response.status_code}): {error_text!r}"
+ )
+ async for chunk in stream_processor(response):
+ yield chunk
+ return
+ except FirstTokenTimeoutError as exc:
+ last_error = exc
+ if response:
+ try:
+ await response.aclose()
+ except Exception:
+ pass
+ continue
+ if last_error:
+ raise last_error
+ raise RuntimeError(
+ f"Model did not respond within {first_token_timeout}s after {max_retries} attempts."
+ )
diff --git a/src/rotator_library/providers/utilities/kiro_utils.py b/src/rotator_library/providers/utilities/kiro_utils.py
new file mode 100644
index 00000000..4921cd83
--- /dev/null
+++ b/src/rotator_library/providers/utilities/kiro_utils.py
@@ -0,0 +1,129 @@
+# SPDX-License-Identifier: LGPL-3.0-only
+# Copyright (c) 2026 Mirrowel
+
+import hashlib
+import json
+import logging
+import os
+import uuid
+from typing import Any, Dict, List, Optional
+
+
+lib_logger = logging.getLogger("rotator_library")
+
+
+KIRO_REFRESH_URL_TEMPLATE = "https://prod.{region}.auth.desktop.kiro.dev/refreshToken"
+AWS_SSO_OIDC_URL_TEMPLATE = "https://oidc.{region}.amazonaws.com/token"
+KIRO_API_HOST_TEMPLATE = "https://q.{region}.amazonaws.com"
+KIRO_Q_HOST_TEMPLATE = "https://q.{region}.amazonaws.com"
+
+TOKEN_REFRESH_THRESHOLD = int(os.getenv("KIRO_TOKEN_REFRESH_THRESHOLD", "600"))
+FIRST_TOKEN_TIMEOUT = float(os.getenv("KIRO_FIRST_TOKEN_TIMEOUT", "20"))
+FIRST_TOKEN_MAX_RETRIES = int(os.getenv("KIRO_FIRST_TOKEN_MAX_RETRIES", "3"))
+MAX_RETRIES = int(os.getenv("KIRO_MAX_RETRIES", "3"))
+BASE_RETRY_DELAY = float(os.getenv("KIRO_BASE_RETRY_DELAY", "1"))
+STREAMING_READ_TIMEOUT = float(os.getenv("KIRO_STREAMING_READ_TIMEOUT", "300"))
+
+TOOL_DESCRIPTION_MAX_LENGTH = int(os.getenv("KIRO_TOOL_DESCRIPTION_MAX_LENGTH", "400"))
+
+FAKE_REASONING_ENABLED = os.getenv("KIRO_FAKE_REASONING_ENABLED", "false").lower() in (
+ "true",
+ "1",
+ "yes",
+)
+FAKE_REASONING_MAX_TOKENS = int(os.getenv("KIRO_FAKE_REASONING_MAX_TOKENS", "1024"))
+FAKE_REASONING_HANDLING = os.getenv(
+ "KIRO_FAKE_REASONING_HANDLING", "as_reasoning_content"
+)
+FAKE_REASONING_OPEN_TAGS = [
+ "",
+ "",
+ "",
+]
+FAKE_REASONING_INITIAL_BUFFER_SIZE = int(
+ os.getenv("KIRO_FAKE_REASONING_INITIAL_BUFFER_SIZE", "200")
+)
+
+
+def get_kiro_refresh_url(region: str) -> str:
+ return KIRO_REFRESH_URL_TEMPLATE.format(region=region)
+
+
+def get_aws_sso_oidc_url(region: str) -> str:
+ return AWS_SSO_OIDC_URL_TEMPLATE.format(region=region)
+
+
+def get_kiro_api_host(region: str) -> str:
+ return KIRO_API_HOST_TEMPLATE.format(region=region)
+
+
+def get_kiro_q_host(region: str) -> str:
+ return KIRO_Q_HOST_TEMPLATE.format(region=region)
+
+
+def get_machine_fingerprint() -> str:
+ try:
+ import socket
+ import getpass
+
+ hostname = socket.gethostname()
+ username = getpass.getuser()
+ unique_string = f"{hostname}-{username}-kiro-gateway"
+ return hashlib.sha256(unique_string.encode()).hexdigest()
+ except Exception as exc:
+ lib_logger.warning(f"Failed to get machine fingerprint: {exc}")
+ return hashlib.sha256(b"default-kiro-gateway").hexdigest()
+
+
+def get_kiro_headers(auth_manager: Any, token: str) -> Dict[str, str]:
+ fingerprint = getattr(auth_manager, "fingerprint", "unknown")
+ return {
+ "Authorization": f"Bearer {token}",
+ "Content-Type": "application/json",
+ "User-Agent": (
+ "aws-sdk-js/1.0.27 ua/2.1 os/win32#10.0.19044 lang/js "
+ "md/nodejs#22.21.1 api/codewhispererstreaming#1.0.27 "
+ f"m/E KiroIDE-0.7.45-{fingerprint}"
+ ),
+ "x-amz-user-agent": f"aws-sdk-js/1.0.27 KiroIDE-0.7.45-{fingerprint}",
+ "x-amzn-codewhisperer-optout": "true",
+ "x-amzn-kiro-agent-mode": "vibe",
+ "amz-sdk-invocation-id": str(uuid.uuid4()),
+ "amz-sdk-request": "attempt=1; max=3",
+ }
+
+
+def generate_completion_id() -> str:
+ return f"chatcmpl-{uuid.uuid4().hex}"
+
+
+def generate_conversation_id(messages: Optional[List[Dict[str, Any]]] = None) -> str:
+ if not messages:
+ return str(uuid.uuid4())
+
+ if len(messages) <= 3:
+ key_messages = messages
+ else:
+ key_messages = messages[:3] + [messages[-1]]
+
+ simplified_messages = []
+ for msg in key_messages:
+ role = msg.get("role", "unknown")
+ content = msg.get("content", "")
+
+ if isinstance(content, str):
+ content_str = content[:100]
+ elif isinstance(content, list):
+ content_str = json.dumps(content, sort_keys=True)[:100]
+ else:
+ content_str = str(content)[:100]
+
+ simplified_messages.append({"role": role, "content": content_str})
+
+ content_json = json.dumps(simplified_messages, sort_keys=True)
+ hash_digest = hashlib.sha256(content_json.encode()).hexdigest()
+ return hash_digest[:16]
+
+
+def generate_tool_call_id() -> str:
+ return f"call_{uuid.uuid4().hex[:8]}"