diff --git a/contributing/samples/cli_coding_agent/.gitignore b/contributing/samples/cli_coding_agent/.gitignore new file mode 100644 index 0000000000..61f2dc9f84 --- /dev/null +++ b/contributing/samples/cli_coding_agent/.gitignore @@ -0,0 +1 @@ +**/__pycache__/ diff --git a/contributing/samples/cli_coding_agent/README.md b/contributing/samples/cli_coding_agent/README.md new file mode 100644 index 0000000000..4cdaafb0f6 --- /dev/null +++ b/contributing/samples/cli_coding_agent/README.md @@ -0,0 +1,76 @@ +# ADK CLI Coding Agent + +An AI coding assistant built with Google ADK featuring file system operations, task planning, and colorful interactive CLI output. + +**This is a toy example! It is not meant to be a replacement for fully featured products like the Gemini CLI or Claude Code.** + + +![alt text](image.png) + +## Features + + +**File System Tools** +- `read_file` - Read file contents with metadata +- `write_file` - Create or overwrite files +- `update_file` - Replace text within files +- `list_directory` - List files/directories with glob patterns + +**Task Planning** +- `create_plan` - Break down complex tasks into steps +- `update_plan` - Mark tasks as completed +- `get_plan` - View current plan and progress +- `reset_plan` - Clear completed plan + +**Colorful Terminal UI** +- Interactive CLI with color-coded output +- Progress tracking with visual task lists +- Concise/verbose output modes + +## Setup + +1. **Navigate to this sample directory:** + ```bash + cd contributing/samples/cli_coding_agent + ``` + +2. **Install dependencies:** + ```bash + uv pip install -r requirements.txt + ``` + +3. **Authenticate with Google Cloud:** + ```bash + gcloud auth application-default login + ``` + +4. **Run the agent:** + ```bash + uv run python -m agent + ``` + + Or use ADK's built-in web interface: + ```bash + adk web agent + ``` + +## Usage + +The agent automatically creates plans for multi-step tasks (3+ steps) and tracks progress: + +``` +[You]: Help me refactor this codebase + +📋 PLAN: Refactor Codebase +Progress: [████████░░░░] 2/3 (67%) + ✓ [0] Analyze current structure + ✓ [1] Extract common utilities + ○ [2] Update imports and tests + +🔧 update_file(file_path=utils.py, old_text=...) +✓ update_file +``` + +**Commands:** +- `exit/quit` - Exit the assistant +- `verbose` - Toggle verbose tool output diff --git a/contributing/samples/cli_coding_agent/agent/__init__.py b/contributing/samples/cli_coding_agent/agent/__init__.py new file mode 100644 index 0000000000..8b04bd6f20 --- /dev/null +++ b/contributing/samples/cli_coding_agent/agent/__init__.py @@ -0,0 +1,65 @@ +"""Coding agent package for file system operations with ADK.""" + +import logging +from pathlib import Path +import sys + +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + + +def setup_logging( + level=logging.INFO, log_file='coding_agent.log', quiet_adk=True +): + """Configure logging for the coding agent package. + + Args: + level: Logging level (default: INFO) + log_file: Path to log file (default: 'coding_agent.log') + quiet_adk: If True, set ADK loggers to WARNING level (default: True) + """ + # Create formatters + detailed_formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + ) + + # Console handler (simple format) + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(level) + console_handler.setFormatter(detailed_formatter) + + # File handler (detailed format) + log_path = Path(log_file) + file_handler = logging.FileHandler(log_path) + file_handler.setLevel(logging.DEBUG) # Log everything to file + file_handler.setFormatter(detailed_formatter) + + # Configure root logger + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + root_logger.addHandler(console_handler) + root_logger.addHandler(file_handler) + + # Quiet down noisy libraries + if quiet_adk: + logging.getLogger('google_adk').setLevel(logging.WARNING) + logging.getLogger('google.genai').setLevel(logging.WARNING) + logging.getLogger('google_genai.types').setLevel(logging.ERROR) + logging.getLogger('httpx').setLevel(logging.WARNING) + logging.getLogger('httpcore').setLevel(logging.WARNING) + logging.getLogger('google_genai.models').setLevel(logging.WARNING) + + return logging.getLogger(__name__) + + +# Initialize logging +logger = setup_logging() +logger.info('Coding agent package initialized') + +# Import agent after logging is configured +from . import agent + +__all__ = ['agent', 'logger', 'setup_logging'] diff --git a/contributing/samples/cli_coding_agent/agent/__main__.py b/contributing/samples/cli_coding_agent/agent/__main__.py new file mode 100644 index 0000000000..85dc2f175c --- /dev/null +++ b/contributing/samples/cli_coding_agent/agent/__main__.py @@ -0,0 +1,354 @@ +#!/usr/bin/env python3 +"""Interactive CLI runner for the coding agent with colorful output.""" + +import asyncio +import json +from typing import Any + +from coding_agent.agent import coding_agent +from google.adk.runners import InMemoryRunner +from google.adk.utils.context_utils import Aclosing +from google.genai import types + + +# ANSI color codes for terminal output +class Colors: + """ANSI color codes for terminal formatting.""" + + # Text colors + RED = "\033[91m" + GREEN = "\033[92m" + YELLOW = "\033[93m" + BLUE = "\033[94m" + MAGENTA = "\033[95m" + CYAN = "\033[96m" + WHITE = "\033[97m" + GRAY = "\033[90m" + + # Styles + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + + # Reset + RESET = "\033[0m" + + @classmethod + def color(cls, text: str, color: str, bold: bool = False) -> str: + """Apply color to text.""" + style = cls.BOLD if bold else "" + return f"{style}{color}{text}{cls.RESET}" + + +def print_header(text: str): + """Print a header with formatting.""" + print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*60}{Colors.RESET}") + print(f"{Colors.BOLD}{Colors.CYAN}{text.center(60)}{Colors.RESET}") + print(f"{Colors.BOLD}{Colors.CYAN}{'='*60}{Colors.RESET}\n") + + +def print_section(title: str): + """Print a section divider.""" + print(f"\n{Colors.BOLD}{Colors.BLUE}▶ {title}{Colors.RESET}") + print(f"{Colors.GRAY}{'─'*60}{Colors.RESET}") + + +def print_event(event_type: str, author: str | None = None): + """Print event metadata.""" + icon_map = { + "ModelTurn": "🤖", + "ToolUse": "🔧", + "ToolResult": "✅", + "Error": "❌", + "TurnComplete": "✨", + } + icon = icon_map.get(event_type, "📨") + + event_color = Colors.YELLOW if "Tool" in event_type else Colors.CYAN + print(f"\n{icon} {Colors.color(event_type, event_color, bold=True)}", end="") + + if author: + print(f" {Colors.GRAY}[{author}]{Colors.RESET}", end="") + print() + + +def print_text(text: str): + """Print assistant text response.""" + print(f"{Colors.GREEN}💬 Response:{Colors.RESET}") + # Indent the text for better readability + for line in text.split("\n"): + print(f" {line}") + + +def truncate_arg(value: Any, max_len: int = 20) -> str: + """Truncate argument value if too long.""" + str_value = str(value) + if len(str_value) > max_len: + return f"{str_value[:max_len]}..." + return str_value + + +def print_tool_call(name: str, args: dict): + """Print tool call information in concise function-call format.""" + if args: + # Format as function call with truncated args + arg_strs = [f"{k}={truncate_arg(v)}" for k, v in args.items()] + args_display = ", ".join(arg_strs) + print( + f"{Colors.MAGENTA}🔧 {Colors.BOLD}{name}({args_display}){Colors.RESET}" + ) + else: + print(f"{Colors.MAGENTA}🔧 {Colors.BOLD}{name}(){Colors.RESET}") + + +def render_plan(plan_data: dict): + """Render a plan in a special visual format. + + Args: + plan_data: Plan data structure with title and tasks + """ + if not plan_data or "plan" not in plan_data: + return + + plan = plan_data.get("plan") + if not plan: + return + + # Print plan header + print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*60}{Colors.RESET}") + print(f"{Colors.BOLD}{Colors.CYAN}📋 PLAN: {plan['title']}{Colors.RESET}") + print(f"{Colors.BOLD}{Colors.CYAN}{'='*60}{Colors.RESET}\n") + + # Calculate progress + completed_count = sum(1 for t in plan["tasks"] if t["completed"]) + total_count = len(plan["tasks"]) + progress_pct = (completed_count / total_count * 100) if total_count > 0 else 0 + + # Progress bar + bar_width = 40 + filled = ( + int(bar_width * completed_count / total_count) if total_count > 0 else 0 + ) + bar = "█" * filled + "░" * (bar_width - filled) + print( + f"{Colors.GRAY}Progress: [{Colors.GREEN}{bar}{Colors.GRAY}]" + f" {completed_count}/{total_count} ({progress_pct:.0f}%){Colors.RESET}\n" + ) + + # Print tasks + for task in plan["tasks"]: + task_id = task["id"] + description = task["description"] + completed = task["completed"] + + if completed: + # Use dim/gray text for completed tasks + print( + f"{Colors.GREEN} ✓ [{task_id}]" + f" {Colors.GRAY}{description}{Colors.RESET}" + ) + else: + print( + f"{Colors.YELLOW} ○ [{task_id}]" + f" {Colors.WHITE}{description}{Colors.RESET}" + ) + + print(f"\n{Colors.GRAY}{'─'*60}{Colors.RESET}\n") + + +def print_tool_response(name: str, response: Any, verbose: bool = False): + """Print tool response information. + + Args: + name: Tool name + response: Tool response + verbose: If True, show full response. If False, show checkmark/error only. + """ + # Check if this is a plan-related response and render specially + if isinstance(response, dict) and "plan" in response and response.get("plan"): + # Always render plan visually + render_plan(response) + return + + if verbose: + print(f"{Colors.GREEN}✅ {Colors.BOLD}{name}{Colors.RESET}") + # Format the response nicely + if isinstance(response, dict): + print(f"{Colors.GRAY} Result:{Colors.RESET}") + formatted_response = json.dumps(response, indent=4) + for line in formatted_response.split("\n"): + print(f" {line}") + elif isinstance(response, list): + print(f"{Colors.GRAY} Result:{Colors.RESET}") + formatted_response = json.dumps(response, indent=4) + for line in formatted_response.split("\n"): + print(f" {line}") + else: + print(f"{Colors.GRAY} Result:{Colors.RESET} {response}") + else: + # Concise mode: just show success/failure + if isinstance(response, dict) and "error" in response: + print(f"{Colors.RED}✗ {name} failed{Colors.RESET}") + else: + print(f"{Colors.GREEN}✓ {name}{Colors.RESET}") + + +def print_error(message: str): + """Print error message.""" + print(f"{Colors.RED}❌ ERROR:{Colors.RESET} {message}") + + +def print_finish(reason: str): + """Print turn completion message.""" + # Simple newline for cleaner output + print() + + +async def run_cli(): + """Run the coding agent in interactive CLI mode.""" + # Session IDs for the conversation + user_id = "cli_user" + session_id = "coding_session" + + # Verbose mode for tool responses (default: False for concise output) + verbose_responses = False + + # Create runner with the agent + runner = InMemoryRunner(agent=coding_agent, app_name=coding_agent.name) + + # Create session + await runner.session_service.create_session( + app_name=runner.app_name, user_id=user_id, session_id=session_id + ) + + # Print ASCII art banner + ascii_art = """ + ╔═══════════════════════════════════════════════════════════╗ + ║ ║ + ║ █████╗ ██████╗ ██╗ ██╗ ██████╗ ██████╗ ██████╗ ██████╗ ║ + ║ ██╔══██╗██╔══██╗██║ ██╔╝ ██╔════╝██╔═══██╗██╔══██╗██╔══██╗ ║ + ║ ███████║██║ ██║█████╔╝ ██║ ██║ ██║██║ ██║█████╗ ║ + ║ ██╔══██║██║ ██║██╔═██╗ ██║ ██║ ██║██║ ██║██╔══╝ ║ + ║ ██║ ██║██████╔╝██║ ██╗ ╚██████╗╚██████╔╝██████╔╝███████╗ ║ + ║ ╚═╝ ╚═╝╚═════╝ ╚═╝ ╚═╝ ╚═════╝ ╚═════╝ ╚═════╝ ╚══════╝ ║ + ║ ║ + ║ ██████╗██╗ ██╗ ║ + ║ ██╔════╝██║ ██║ ║ + ║ ██║ ██║ ██║ ║ + ║ ██║ ██║ ██║ ║ + ║ ╚██████╗███████╗██║ ║ + ║ ╚═════╝╚══════╝╚═╝ ║ + ║ ║ + ╚═══════════════════════════════════════════════════════════╝ + """ + print(f"{Colors.CYAN}{Colors.BOLD}{ascii_art}{Colors.RESET}") + + print( + f"\n{Colors.CYAN}Agent:{Colors.RESET}" + f" {Colors.BOLD}{coding_agent.name}{Colors.RESET}" + ) + print(f"{Colors.CYAN}Model:{Colors.RESET} {coding_agent.model}") + print( + f"{Colors.CYAN}Output:{Colors.RESET}" + f" {'Verbose' if verbose_responses else 'Concise'} mode" + ) + print(f"\n{Colors.GRAY}Commands:{Colors.RESET}") + print(f" • {Colors.YELLOW}exit/quit{Colors.RESET} - Exit the assistant") + print( + f" • {Colors.YELLOW}verbose{Colors.RESET} - Toggle verbose tool" + " responses" + ) + print(f"\n{Colors.GRAY}File tools:{Colors.RESET}") + print(f" • read_file, write_file, update_file, list_directory") + print(f"\n{Colors.GRAY}Planning tools:{Colors.RESET}") + print(f" • create_plan, update_plan, get_plan, reset_plan") + print() + + while True: + # Get user input + try: + user_input = input(f"{Colors.BOLD}{Colors.BLUE}[You]:{Colors.RESET} ") + except (EOFError, KeyboardInterrupt): + print(f"\n{Colors.YELLOW}Exiting...{Colors.RESET}") + break + + # Check for exit command + if user_input.strip().lower() in ["exit", "quit", "q"]: + print(f"{Colors.YELLOW}Goodbye!{Colors.RESET}") + break + + # Check for verbose toggle + if user_input.strip().lower() == "verbose": + verbose_responses = not verbose_responses + mode = "verbose" if verbose_responses else "concise" + print( + f"{Colors.CYAN}Output mode set to:" + f" {Colors.BOLD}{mode}{Colors.RESET}\n" + ) + continue + + # Skip empty inputs + if not user_input.strip(): + continue + + # Create message content + message = types.Content(role="user", parts=[types.Part(text=user_input)]) + + # Run the agent and stream events + try: + print() # Add blank line before output + + async with Aclosing( + runner.run_async( + user_id=user_id, session_id=session_id, new_message=message + ) + ) as event_stream: + async for event in event_stream: + + # Handle content parts + if event.content and event.content.parts: + for part in event.content.parts: + # Print text content + if part.text: + print_text(part.text) + + # Print function calls + if part.function_call: + print_tool_call( + part.function_call.name, part.function_call.args + ) + + # Print function responses + if part.function_response: + response = getattr(part.function_response, "response", None) + print_tool_response( + part.function_response.name, + response, + verbose=verbose_responses, + ) + + # Print error information if present + if event.error_message: + print_error(event.error_message) + + # Print finish reason if turn is complete + if event.turn_complete and event.finish_reason: + print_finish(event.finish_reason) + + except Exception as e: + print_error(str(e)) + import traceback + + print(f"{Colors.GRAY}{traceback.format_exc()}{Colors.RESET}") + + # Close the runner + await runner.close() + print(f"\n{Colors.CYAN}Session closed. Goodbye!{Colors.RESET}\n") + + +def main(): + """Entry point for the CLI.""" + asyncio.run(run_cli()) + + +if __name__ == "__main__": + main() diff --git a/contributing/samples/cli_coding_agent/agent/agent.py b/contributing/samples/cli_coding_agent/agent/agent.py new file mode 100644 index 0000000000..bed2e0b76e --- /dev/null +++ b/contributing/samples/cli_coding_agent/agent/agent.py @@ -0,0 +1,60 @@ +"""Coding agent with file system tools.""" + +from coding_agent.tools import create_plan +from coding_agent.tools import get_plan +from coding_agent.tools import list_directory +from coding_agent.tools import print_affirming_message +from coding_agent.tools import read_file +from coding_agent.tools import reset_plan +from coding_agent.tools import update_file +from coding_agent.tools import update_plan +from coding_agent.tools import write_file +from google.adk.agents import Agent +from google.adk.tools import FunctionTool + +# Create the coding agent with file system tools +coding_agent = Agent( + name="coding_assistant", + model="gemini-2.5-flash", + description=( + "A helpful coding assistant that can read and write files on the" + " filesystem. I can help you read file contents, create new files," + " update existing files, explore directory structures, and" + " create/manage task plans. I'm here to assist with your coding tasks!" + ), + instruction="""You are a helpful coding assistant with file system and planning capabilities. + +PLANNING GUIDANCE: +- For any multi-step task (3+ steps), ALWAYS create a plan first using create_plan() +- Break down complex tasks into clear, actionable steps +- After completing each task, update the plan using update_plan(task_id, completed=True) +- Use get_plan() to check progress when needed +- Plans help track progress and keep the user informed + +WORKFLOW: +1. When given a complex task, create a plan with all steps +2. Work through tasks sequentially +3. Mark each task complete as you finish it +4. The plan will be displayed visually with progress tracking +5. When you finish all the tasks, reset the plan. + +EXAMPLE: +User: "Help me build a REST API" +1. create_plan(title="Build REST API", tasks=["Set up project structure", "Create models", "Add endpoints", "Write tests", "Add documentation"]) +2. Work on task 0, then update_plan(task_id=0, completed=True) +3. Continue with remaining tasks +4. Once all tasks are done, reset_plan() + +Use the planning tools proactively to provide clear visibility into your work progress.""", + tools=[ + FunctionTool(func=read_file), + FunctionTool(func=write_file), + FunctionTool(func=update_file), + FunctionTool(func=list_directory), + FunctionTool(func=print_affirming_message), + FunctionTool(func=create_plan), + FunctionTool(func=update_plan), + FunctionTool(func=reset_plan), + FunctionTool(func=get_plan), + ], +) diff --git a/contributing/samples/cli_coding_agent/agent/tools.py b/contributing/samples/cli_coding_agent/agent/tools.py new file mode 100644 index 0000000000..d080f78a16 --- /dev/null +++ b/contributing/samples/cli_coding_agent/agent/tools.py @@ -0,0 +1,393 @@ +"""File system tools for the coding agent.""" + +from pathlib import Path +import random +from typing import Optional + +# Global plan state +_current_plan: dict | None = None + + +def read_file(file_path: str) -> dict: + """Read the contents of a file. + + Args: + file_path: Path to the file to read (relative or absolute) + + Returns: + A dictionary containing the file contents and metadata, or error info + """ + try: + path = Path(file_path).expanduser().resolve() + + if not path.exists(): + return { + "error": "file_not_found", + "message": f"File not found: {file_path}", + "path": file_path, + } + + if not path.is_file(): + return { + "error": "not_a_file", + "message": f"Path is not a file: {file_path}", + "path": file_path, + } + + try: + with open(path, "r", encoding="utf-8") as f: + content = f.read() + + return { + "success": True, + "path": str(path), + "content": content, + "size_bytes": path.stat().st_size, + "lines": len(content.splitlines()), + } + except UnicodeDecodeError: + # Try to read as binary if UTF-8 fails + with open(path, "rb") as f: + content_bytes = f.read() + + return { + "success": True, + "path": str(path), + "content": f"", + "size_bytes": len(content_bytes), + "lines": None, + "is_binary": True, + } + except PermissionError as e: + return { + "error": "permission_denied", + "message": f"Permission denied: {file_path}", + "path": file_path, + } + except Exception as e: + return {"error": "unexpected_error", "message": str(e), "path": file_path} + + +def write_file(file_path: str, content: str, create_dirs: bool = True) -> dict: + """Write content to a file, creating it if it doesn't exist. + + Args: + file_path: Path to the file to write (relative or absolute) + content: The content to write to the file + create_dirs: If True, create parent directories if they don't exist + + Returns: + A dictionary with the operation result or error info + """ + try: + path = Path(file_path).expanduser().resolve() + + # Create parent directories if needed + if create_dirs and not path.parent.exists(): + path.parent.mkdir(parents=True, exist_ok=True) + + # Check if file exists + existed = path.exists() + + # Write the file + with open(path, "w", encoding="utf-8") as f: + f.write(content) + + return { + "success": True, + "path": str(path), + "operation": "updated" if existed else "created", + "size_bytes": path.stat().st_size, + "lines": len(content.splitlines()), + } + except PermissionError: + return { + "error": "permission_denied", + "message": f"Permission denied: {file_path}", + "path": file_path, + } + except Exception as e: + return {"error": "unexpected_error", "message": str(e), "path": file_path} + + +def update_file(file_path: str, old_text: str, new_text: str) -> dict: + """Update a file by replacing old text with new text. + + Args: + file_path: Path to the file to update (relative or absolute) + old_text: The text to find and replace + new_text: The text to replace with + + Returns: + A dictionary with the operation result or error info + """ + try: + path = Path(file_path).expanduser().resolve() + + if not path.exists(): + return { + "error": "file_not_found", + "message": f"File not found: {file_path}", + "path": file_path, + } + + # Read current content + with open(path, "r", encoding="utf-8") as f: + content = f.read() + + # Check if old_text exists + if old_text not in content: + return { + "error": "text_not_found", + "message": f"Text not found in file: {old_text[:50]}...", + "path": str(path), + } + + # Count occurrences + occurrences = content.count(old_text) + + # Replace text + new_content = content.replace(old_text, new_text) + + # Write back + with open(path, "w", encoding="utf-8") as f: + f.write(new_content) + + return { + "success": True, + "path": str(path), + "operation": "updated", + "replacements": occurrences, + "size_bytes": path.stat().st_size, + "lines": len(new_content.splitlines()), + } + except PermissionError: + return { + "error": "permission_denied", + "message": f"Permission denied: {file_path}", + "path": file_path, + } + except Exception as e: + return {"error": "unexpected_error", "message": str(e), "path": file_path} + + +def list_directory( + directory_path: str = ".", + pattern: Optional[str] = None, + recursive: bool = False, +) -> dict: + """List files and directories in a directory. + + Args: + directory_path: Path to the directory to list (default: current directory) + pattern: Optional glob pattern to filter files (e.g., "*.py", "**/*.txt") + recursive: If True, list files recursively (default: False) + + Returns: + A dictionary containing lists of files and directories or error info + """ + try: + path = Path(directory_path).expanduser().resolve() + + if not path.exists(): + return { + "error": "directory_not_found", + "message": f"Directory not found: {directory_path}", + "path": directory_path, + } + + if not path.is_dir(): + return { + "error": "not_a_directory", + "message": f"Path is not a directory: {directory_path}", + "path": directory_path, + } + + files = [] + directories = [] + + if pattern: + # Use glob with pattern + glob_pattern = f"**/{pattern}" if recursive else pattern + for item in path.glob(glob_pattern): + rel_path = item.relative_to(path) + if item.is_file(): + files.append( + {"name": str(rel_path), "size_bytes": item.stat().st_size} + ) + elif item.is_dir(): + directories.append(str(rel_path)) + else: + # List all items + if recursive: + for item in path.rglob("*"): + rel_path = item.relative_to(path) + if item.is_file(): + files.append( + {"name": str(rel_path), "size_bytes": item.stat().st_size} + ) + elif item.is_dir(): + directories.append(str(rel_path)) + else: + for item in path.iterdir(): + if item.is_file(): + files.append({"name": item.name, "size_bytes": item.stat().st_size}) + elif item.is_dir(): + directories.append(item.name) + + # Sort for consistent output + files.sort(key=lambda x: x["name"]) + directories.sort() + + return { + "success": True, + "path": str(path), + "files": files, + "directories": directories, + "file_count": len(files), + "directory_count": len(directories), + } + except PermissionError: + return { + "error": "permission_denied", + "message": f"Permission denied: {directory_path}", + "path": directory_path, + } + except Exception as e: + return { + "error": "unexpected_error", + "message": str(e), + "path": directory_path, + } + + +def print_affirming_message() -> str: + """Prints a random affirming message.""" + messages = [ + "You're doing great!", + "Keep up the excellent work!", + "You're making fantastic progress!", + "Awesome job!", + "You're a star!", + "Incredible effort!", + "Keep shining!", + "You're truly amazing!", + "Well done!", + "Fantastic work!", + ] + message = random.choice(messages) + print(f"Affirmation: {message}") + return message + + +# ============================================================================ +# Planning Tools +# ============================================================================ + + +def create_plan(title: str, tasks: list[str]) -> dict: + """Create a new plan with a list of tasks. + + Args: + title: The title/goal of the plan + tasks: List of task descriptions + + Returns: + The created plan with task statuses + """ + global _current_plan + + _current_plan = { + "title": title, + "tasks": [ + {"id": i, "description": task, "completed": False} + for i, task in enumerate(tasks) + ], + } + + return { + "status": "plan_created", + "title": _current_plan["title"], + "total_tasks": len(_current_plan["tasks"]), + "plan": _current_plan, + } + + +def update_plan(task_id: int, completed: bool) -> dict: + """Update the completion status of a task in the plan. + + Args: + task_id: The ID of the task to update (0-based index) + completed: Whether the task is completed + + Returns: + Updated plan information or error info + """ + global _current_plan + + if _current_plan is None: + return { + "error": "no_plan", + "message": "No plan exists. Create a plan first using create_plan.", + } + + if task_id < 0 or task_id >= len(_current_plan["tasks"]): + return { + "error": "invalid_task_id", + "message": ( + f"Invalid task_id: {task_id}. Must be between 0 and" + f" {len(_current_plan['tasks']) - 1}" + ), + "task_id": task_id, + "valid_range": f"0-{len(_current_plan['tasks']) - 1}", + } + + _current_plan["tasks"][task_id]["completed"] = completed + + completed_count = sum(1 for t in _current_plan["tasks"] if t["completed"]) + total_count = len(_current_plan["tasks"]) + + return { + "status": "plan_updated", + "task_id": task_id, + "task_description": _current_plan["tasks"][task_id]["description"], + "completed": completed, + "progress": f"{completed_count}/{total_count}", + "plan": _current_plan, + } + + +def reset_plan() -> dict: + """Reset/clear the current plan. + + Returns: + Confirmation of plan reset + """ + global _current_plan + + had_plan = _current_plan is not None + _current_plan = None + + return {"status": "plan_reset", "had_plan": had_plan} + + +def get_plan() -> dict: + """Get the current plan. + + Returns: + The current plan or None if no plan exists + """ + global _current_plan + + if _current_plan is None: + return {"status": "no_plan", "plan": None} + + completed_count = sum(1 for t in _current_plan["tasks"] if t["completed"]) + total_count = len(_current_plan["tasks"]) + + return { + "status": "plan_exists", + "title": _current_plan["title"], + "progress": f"{completed_count}/{total_count}", + "plan": _current_plan, + } diff --git a/contributing/samples/cli_coding_agent/image.png b/contributing/samples/cli_coding_agent/image.png new file mode 100644 index 0000000000..2cf53ebef1 Binary files /dev/null and b/contributing/samples/cli_coding_agent/image.png differ diff --git a/contributing/samples/cli_coding_agent/requirements.txt b/contributing/samples/cli_coding_agent/requirements.txt new file mode 100644 index 0000000000..e2983d2e08 --- /dev/null +++ b/contributing/samples/cli_coding_agent/requirements.txt @@ -0,0 +1,2 @@ +google-adk +google-genai