diff --git a/src/fastapi_cloud_cli/cli.py b/src/fastapi_cloud_cli/cli.py index 85e22cb..b595d12 100644 --- a/src/fastapi_cloud_cli/cli.py +++ b/src/fastapi_cloud_cli/cli.py @@ -4,6 +4,7 @@ from .commands.env import env_app from .commands.login import login from .commands.logout import logout +from .commands.logs import logs from .commands.unlink import unlink from .commands.whoami import whoami from .logging import setup_logging @@ -25,6 +26,7 @@ # fastapi cloud [command] cloud_app.command()(deploy) cloud_app.command()(login) +cloud_app.command()(logs) cloud_app.command()(logout) cloud_app.command()(whoami) cloud_app.command()(unlink) diff --git a/src/fastapi_cloud_cli/commands/deploy.py b/src/fastapi_cloud_cli/commands/deploy.py index a3c8a88..e94c3f8 100644 --- a/src/fastapi_cloud_cli/commands/deploy.py +++ b/src/fastapi_cloud_cli/commands/deploy.py @@ -19,7 +19,7 @@ from rich_toolkit.menu import Option from fastapi_cloud_cli.commands.login import login -from fastapi_cloud_cli.utils.api import APIClient, BuildLogError, TooManyRetriesError +from fastapi_cloud_cli.utils.api import APIClient, StreamLogError, TooManyRetriesError from fastapi_cloud_cli.utils.apps import AppConfig, get_app_config, write_app_config from fastapi_cloud_cli.utils.auth import Identity from fastapi_cloud_cli.utils.cli import get_rich_toolkit, handle_http_errors @@ -429,7 +429,7 @@ def _wait_for_deployment( last_message_changed_at = time.monotonic() - except (BuildLogError, TooManyRetriesError, TimeoutError) as e: + except (StreamLogError, TooManyRetriesError, TimeoutError) as e: progress.set_error( dedent(f""" [error]Build log streaming failed: {e}[/] @@ -438,7 +438,7 @@ def _wait_for_deployment( """).strip() ) - raise typer.Exit(1) from e + raise typer.Exit(1) from None class SignupToWaitingList(BaseModel): diff --git a/src/fastapi_cloud_cli/commands/logs.py b/src/fastapi_cloud_cli/commands/logs.py new file mode 100644 index 0000000..1076038 --- /dev/null +++ b/src/fastapi_cloud_cli/commands/logs.py @@ -0,0 +1,185 @@ +import logging +import re +from datetime import datetime +from pathlib import Path +from typing import Annotated, Optional + +import typer +from rich.markup import escape +from rich_toolkit import RichToolkit + +from fastapi_cloud_cli.utils.api import ( + APIClient, + AppLogEntry, + StreamLogError, + TooManyRetriesError, +) +from fastapi_cloud_cli.utils.apps import AppConfig, get_app_config +from fastapi_cloud_cli.utils.auth import Identity +from fastapi_cloud_cli.utils.cli import get_rich_toolkit + +logger = logging.getLogger(__name__) + + +LOG_LEVEL_COLORS = { + "debug": "blue", + "info": "cyan", + "warning": "yellow", + "warn": "yellow", + "error": "red", + "critical": "magenta", + "fatal": "magenta", +} + +SINCE_PATTERN = re.compile(r"^\d+[smhd]$") + + +def _validate_since(value: str) -> str: + """Validate the --since parameter format.""" + if not SINCE_PATTERN.match(value): + raise typer.BadParameter( + "Invalid format. Use a number followed by s, m, h, or d (e.g., '5m', '1h', '2d')." + ) + + return value + + +def _format_log_line(log: AppLogEntry) -> str: + """Format a log entry for display with a colored indicator""" + # Parse the timestamp string to format it consistently + timestamp = datetime.fromisoformat(log.timestamp.replace("Z", "+00:00")) + timestamp_str = timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + color = LOG_LEVEL_COLORS.get(log.level.lower()) + + message = escape(log.message) + + if color: + return f"[{color}]┃[/{color}] [dim]{timestamp_str}[/dim] {message}" + + return f"[dim]┃[/dim] [dim]{timestamp_str}[/dim] {message}" + + +def _process_log_stream( + toolkit: RichToolkit, + app_config: AppConfig, + tail: int, + since: str, + follow: bool, +) -> None: + """Stream app logs and print them to the console.""" + log_count = 0 + + try: + with APIClient() as client: + for log in client.stream_app_logs( + app_id=app_config.app_id, + tail=tail, + since=since, + follow=follow, + ): + toolkit.print(_format_log_line(log)) + log_count += 1 + + if not follow and log_count == 0: + toolkit.print("No logs found for the specified time range.") + return + except KeyboardInterrupt: # pragma: no cover + toolkit.print_line() + return + except StreamLogError as e: + error_msg = str(e) + if "HTTP 401" in error_msg or "HTTP 403" in error_msg: + toolkit.print( + "The specified token is not valid. Use [blue]`fastapi login`[/] to generate a new token.", + ) + elif "HTTP 404" in error_msg: + toolkit.print( + "App not found. Make sure to use the correct account.", + ) + else: + toolkit.print( + f"[red]Error:[/] {escape(error_msg)}", + ) + raise typer.Exit(1) from None + except (TooManyRetriesError, TimeoutError): + toolkit.print( + "Lost connection to log stream. Please try again later.", + ) + raise typer.Exit(1) from None + + +def logs( + path: Annotated[ + Optional[Path], + typer.Argument( + help="Path to the folder containing the app (defaults to current directory)" + ), + ] = None, + tail: int = typer.Option( + 100, + "--tail", + "-t", + help="Number of log lines to show before streaming.", + show_default=True, + ), + since: str = typer.Option( + "5m", + "--since", + "-s", + help="Show logs since a specific time (e.g., '5m', '1h', '2d').", + show_default=True, + callback=_validate_since, + ), + follow: bool = typer.Option( + True, + "--follow/--no-follow", + "-f", + help="Stream logs in real-time (use --no-follow to fetch and exit).", + ), +) -> None: + """Stream or fetch logs from your deployed app. + + Examples: + fastapi cloud logs # Stream logs in real-time + fastapi cloud logs --no-follow # Fetch recent logs and exit + fastapi cloud logs --tail 50 --since 1h # Last 50 logs from the past hour + """ + identity = Identity() + with get_rich_toolkit(minimal=True) as toolkit: + if not identity.is_logged_in(): + toolkit.print( + "No credentials found. Use [blue]`fastapi login`[/] to login.", + tag="auth", + ) + raise typer.Exit(1) + + app_path = path or Path.cwd() + app_config = get_app_config(app_path) + + if not app_config: + toolkit.print( + "No app linked to this directory. Run [blue]`fastapi deploy`[/] first.", + ) + raise typer.Exit(1) + + logger.debug("Fetching logs for app ID: %s", app_config.app_id) + + if follow: + toolkit.print( + f"Streaming logs for [bold]{app_config.app_id}[/bold] (Ctrl+C to exit)...", + tag="logs", + ) + else: + toolkit.print( + f"Fetching logs for [bold]{app_config.app_id}[/bold]...", + tag="logs", + ) + toolkit.print_line() + + _process_log_stream( + toolkit=toolkit, + app_config=app_config, + tail=tail, + since=since, + follow=follow, + ) diff --git a/src/fastapi_cloud_cli/utils/api.py b/src/fastapi_cloud_cli/utils/api.py index 06cf3fe..896b30a 100644 --- a/src/fastapi_cloud_cli/utils/api.py +++ b/src/fastapi_cloud_cli/utils/api.py @@ -25,11 +25,13 @@ logger = logging.getLogger(__name__) -BUILD_LOG_MAX_RETRIES = 3 -BUILD_LOG_TIMEOUT = timedelta(minutes=5) +STREAM_LOGS_MAX_RETRIES = 3 +STREAM_LOGS_TIMEOUT = timedelta(minutes=5) -class BuildLogError(Exception): +class StreamLogError(Exception): + """Raised when there's an error streaming logs (build or app logs).""" + pass @@ -37,6 +39,12 @@ class TooManyRetriesError(Exception): pass +class AppLogEntry(BaseModel): + timestamp: str + message: str + level: str + + class BuildLogLineGeneric(BaseModel): type: Literal["complete", "failed", "timeout", "heartbeat"] id: Optional[str] = None @@ -91,7 +99,7 @@ def _backoff() -> None: error_detail = error.response.text except Exception: error_detail = "(response body unavailable)" - raise BuildLogError( + raise StreamLogError( f"HTTP {error.response.status_code}: {error_detail}" ) from error @@ -115,7 +123,7 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> Generator[T, None, None]: for attempt_number in range(total_attempts): if time.monotonic() - start > timeout.total_seconds(): raise TimeoutError( - f"Build log streaming timed out after {timeout.total_seconds():.0f}s" + f"Log streaming timed out after {timeout.total_seconds():.0f}s" ) with attempt(attempt_number): @@ -144,7 +152,7 @@ def __init__(self) -> None: }, ) - @attempts(BUILD_LOG_MAX_RETRIES, BUILD_LOG_TIMEOUT) + @attempts(STREAM_LOGS_MAX_RETRIES, STREAM_LOGS_TIMEOUT) def stream_build_logs( self, deployment_id: str ) -> Generator[BuildLogLine, None, None]: @@ -192,3 +200,44 @@ def _parse_log_line(self, line: str) -> Optional[BuildLogLine]: except (ValidationError, json.JSONDecodeError) as e: logger.debug("Skipping malformed log: %s (error: %s)", line[:100], e) return None + + @attempts(STREAM_LOGS_MAX_RETRIES, STREAM_LOGS_TIMEOUT) + def stream_app_logs( + self, + app_id: str, + tail: int, + since: str, + follow: bool, + ) -> Generator[AppLogEntry, None, None]: + timeout = 120 if follow else 30 + with self.stream( + "GET", + f"/apps/{app_id}/logs/stream", + params={ + "tail": tail, + "since": since, + "follow": follow, + }, + timeout=timeout, + ) as response: + response.raise_for_status() + for line in response.iter_lines(): + if not line or not line.strip(): # pragma: no cover + continue + try: + data = json.loads(line) + except json.JSONDecodeError: + logger.debug("Failed to parse log line: %s", line) + continue + + if data.get("type") == "heartbeat": + continue + + if data.get("type") == "error": + raise StreamLogError(data.get("message", "Unknown error")) + + try: + yield AppLogEntry.model_validate(data) + except ValidationError as e: # pragma: no cover + logger.debug("Failed to parse log entry: %s - %s", data, e) + continue diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 6a25d3f..c45f9c1 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -9,10 +9,10 @@ from fastapi_cloud_cli.config import Settings from fastapi_cloud_cli.utils.api import ( - BUILD_LOG_MAX_RETRIES, + STREAM_LOGS_MAX_RETRIES, APIClient, - BuildLogError, BuildLogLineMessage, + StreamLogError, TooManyRetriesError, ) from tests.utils import build_logs_response @@ -243,7 +243,7 @@ def test_stream_build_logs_client_error_raises_immediately( ) -> None: logs_route.mock(return_value=Response(404, text="Not Found")) - with pytest.raises(BuildLogError, match="HTTP 404"): + with pytest.raises(StreamLogError, match="HTTP 404"): list(client.stream_build_logs(deployment_id)) @@ -255,7 +255,8 @@ def test_stream_build_logs_max_retries_exceeded( with patch("time.sleep"): with pytest.raises( - TooManyRetriesError, match=f"Failed after {BUILD_LOG_MAX_RETRIES} attempts" + TooManyRetriesError, + match=f"Failed after {STREAM_LOGS_MAX_RETRIES} attempts", ): list(client.stream_build_logs(deployment_id)) @@ -343,7 +344,7 @@ def test_stream_build_logs_connection_closed_without_complete_failed_or_timeout( logs = client.stream_build_logs(deployment_id) with patch("time.sleep"), pytest.raises(TooManyRetriesError, match="Failed after"): - for _ in range(BUILD_LOG_MAX_RETRIES + 1): + for _ in range(STREAM_LOGS_MAX_RETRIES + 1): next(logs) diff --git a/tests/test_cli_deploy.py b/tests/test_cli_deploy.py index 72ea419..5c20ba5 100644 --- a/tests/test_cli_deploy.py +++ b/tests/test_cli_deploy.py @@ -15,7 +15,7 @@ from fastapi_cloud_cli.cli import app from fastapi_cloud_cli.config import Settings -from fastapi_cloud_cli.utils.api import BuildLogError, TooManyRetriesError +from fastapi_cloud_cli.utils.api import StreamLogError, TooManyRetriesError from tests.conftest import ConfiguredApp from tests.utils import Keys, build_logs_response, changing_dir @@ -823,7 +823,7 @@ def test_shows_no_apps_found_message_when_team_has_no_apps( @pytest.mark.parametrize( "error", - [BuildLogError, TooManyRetriesError, TimeoutError], + [StreamLogError, TooManyRetriesError, TimeoutError], ) @pytest.mark.respx(base_url=settings.base_api_url) def test_shows_error_message_on_build_exception( diff --git a/tests/test_logs.py b/tests/test_logs.py new file mode 100644 index 0000000..bf540c4 --- /dev/null +++ b/tests/test_logs.py @@ -0,0 +1,378 @@ +import json +from unittest.mock import patch + +import httpx +import pytest +import respx +from typer.testing import CliRunner + +from fastapi_cloud_cli.cli import cloud_app as app +from fastapi_cloud_cli.config import Settings +from fastapi_cloud_cli.utils.api import TooManyRetriesError +from tests.conftest import ConfiguredApp +from tests.utils import changing_dir + +runner = CliRunner() +settings = Settings.get() + + +def test_shows_message_if_not_logged_in(logged_out_cli: None) -> None: + result = runner.invoke(app, ["logs"]) + + assert result.exit_code == 1 + assert "No credentials found" in result.output + + +def test_shows_message_if_app_not_configured(logged_in_cli: None) -> None: + result = runner.invoke(app, ["logs"]) + + assert result.exit_code == 1 + assert "No app linked to this directory" in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_displays_logs( + logged_in_cli: None, respx_mock: respx.MockRouter, configured_app: ConfiguredApp +) -> None: + log_lines = [ + json.dumps( + { + "timestamp": "2025-12-05T14:32:01.123000Z", + "message": "Application startup complete", + "level": "info", + } + ), + json.dumps( + { + "timestamp": "2025-12-05T14:32:05.456000Z", + "message": "GET /health 200", + "level": "info", + } + ), + ] + response_content = "\n".join(log_lines) + + respx_mock.get(url__regex=rf"/apps/{configured_app.app_id}/logs/stream.*").mock( + return_value=httpx.Response(200, content=response_content) + ) + + with changing_dir(configured_app.path): + result = runner.invoke(app, ["logs", "--no-follow"]) + + assert result.exit_code == 0 + assert "Fetching logs" in result.output + assert configured_app.app_id in result.output + assert "Application startup complete" in result.output + assert "GET /health 200" in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_passes_default_params( + logged_in_cli: None, respx_mock: respx.MockRouter, configured_app: ConfiguredApp +) -> None: + route = respx_mock.get( + url__regex=rf"/apps/{configured_app.app_id}/logs/stream.*" + ).mock(return_value=httpx.Response(200, content="")) + + with changing_dir(configured_app.path): + result = runner.invoke(app, ["logs"]) + + assert result.exit_code == 0 + url = str(route.calls[0].request.url).lower() + assert "follow=true" in url + assert "tail=100" in url + assert "since=5m" in url + assert "Streaming logs" in result.output + assert configured_app.app_id in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_passes_custom_params( + logged_in_cli: None, respx_mock: respx.MockRouter, configured_app: ConfiguredApp +) -> None: + route = respx_mock.get( + url__regex=rf"/apps/{configured_app.app_id}/logs/stream.*" + ).mock(return_value=httpx.Response(200, content="")) + + with changing_dir(configured_app.path): + result = runner.invoke( + app, ["logs", "--no-follow", "--tail", "50", "--since", "1h"] + ) + + assert result.exit_code == 0 + url = str(route.calls[0].request.url).lower() + assert "tail=50" in url + assert "since=1h" in url + assert "follow=false" in url + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_displays_all_log_levels( + logged_in_cli: None, respx_mock: respx.MockRouter, configured_app: ConfiguredApp +) -> None: + log_lines = [ + json.dumps( + { + "timestamp": "2025-12-05T14:32:01.123000Z", + "message": "Debug message", + "level": "debug", + } + ), + json.dumps( + { + "timestamp": "2025-12-05T14:32:02.123000Z", + "message": "Info message", + "level": "info", + } + ), + json.dumps( + { + "timestamp": "2025-12-05T14:32:03.123000Z", + "message": "Warning message", + "level": "warning", + } + ), + json.dumps( + { + "timestamp": "2025-12-05T14:32:04.123000Z", + "message": "Error message", + "level": "error", + } + ), + ] + response_content = "\n".join(log_lines) + + respx_mock.get(url__regex=rf"/apps/{configured_app.app_id}/logs/stream.*").mock( + return_value=httpx.Response(200, content=response_content) + ) + + with changing_dir(configured_app.path): + result = runner.invoke(app, ["logs", "--no-follow"]) + + assert result.exit_code == 0 + assert "Debug message" in result.output + assert "Info message" in result.output + assert "Warning message" in result.output + assert "Error message" in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_handles_401_unauthorized( + logged_in_cli: None, respx_mock: respx.MockRouter, configured_app: ConfiguredApp +) -> None: + respx_mock.get(url__regex=rf"/apps/{configured_app.app_id}/logs/stream.*").mock( + return_value=httpx.Response(401) + ) + + with changing_dir(configured_app.path): + result = runner.invoke(app, ["logs", "--no-follow"]) + + assert result.exit_code == 1 + assert "token is not valid" in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_handles_404( + logged_in_cli: None, respx_mock: respx.MockRouter, configured_app: ConfiguredApp +) -> None: + respx_mock.get(url__regex=rf"/apps/{configured_app.app_id}/logs/stream.*").mock( + return_value=httpx.Response(404) + ) + + with changing_dir(configured_app.path): + result = runner.invoke(app, ["logs", "--no-follow"]) + + assert result.exit_code == 1 + assert "App not found" in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_shows_message_when_no_logs_found( + logged_in_cli: None, respx_mock: respx.MockRouter, configured_app: ConfiguredApp +) -> None: + respx_mock.get(url__regex=rf"/apps/{configured_app.app_id}/logs/stream.*").mock( + return_value=httpx.Response(200, content="") + ) + + with changing_dir(configured_app.path): + result = runner.invoke(app, ["logs", "--no-follow"]) + + assert result.exit_code == 0 + assert "No logs found" in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_handles_server_error_message( + logged_in_cli: None, respx_mock: respx.MockRouter, configured_app: ConfiguredApp +) -> None: + log_lines = [ + json.dumps({"type": "error", "message": "Log storage unavailable"}), + ] + response_content = "\n".join(log_lines) + + respx_mock.get(url__regex=rf"/apps/{configured_app.app_id}/logs/stream.*").mock( + return_value=httpx.Response(200, content=response_content) + ) + + with changing_dir(configured_app.path): + result = runner.invoke(app, ["logs", "--no-follow"]) + + assert result.exit_code == 1 + assert "Log storage unavailable" in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_handles_unknown_log_level( + logged_in_cli: None, respx_mock: respx.MockRouter, configured_app: ConfiguredApp +) -> None: + log_lines = [ + json.dumps( + { + "timestamp": "2025-12-05T14:32:01.123000Z", + "message": "Unknown level message", + "level": "custom_level", + } + ), + ] + response_content = "\n".join(log_lines) + + respx_mock.get(url__regex=rf"/apps/{configured_app.app_id}/logs/stream.*").mock( + return_value=httpx.Response(200, content=response_content) + ) + + with changing_dir(configured_app.path): + result = runner.invoke(app, ["logs", "--no-follow"]) + + assert result.exit_code == 0 + assert "Unknown level message" in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_skips_invalid_json_lines( + logged_in_cli: None, respx_mock: respx.MockRouter, configured_app: ConfiguredApp +) -> None: + log_lines = [ + "not valid json", + json.dumps( + { + "timestamp": "2025-12-05T14:32:01.123000Z", + "message": "Valid log message", + "level": "info", + } + ), + ] + response_content = "\n".join(log_lines) + + respx_mock.get(url__regex=rf"/apps/{configured_app.app_id}/logs/stream.*").mock( + return_value=httpx.Response(200, content=response_content) + ) + + with changing_dir(configured_app.path): + result = runner.invoke(app, ["logs", "--no-follow"]) + + assert result.exit_code == 0 + assert "Valid log message" in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_skips_heartbeat_messages( + logged_in_cli: None, respx_mock: respx.MockRouter, configured_app: ConfiguredApp +) -> None: + log_lines = [ + json.dumps({"type": "heartbeat"}), + json.dumps( + { + "timestamp": "2025-12-05T14:32:01.123000Z", + "message": "Real log message", + "level": "info", + } + ), + ] + response_content = "\n".join(log_lines) + + respx_mock.get(url__regex=rf"/apps/{configured_app.app_id}/logs/stream.*").mock( + return_value=httpx.Response(200, content=response_content) + ) + + with changing_dir(configured_app.path): + result = runner.invoke(app, ["logs", "--no-follow"]) + + assert result.exit_code == 0 + assert "Real log message" in result.output + assert "heartbeat" not in result.output.lower() + + +@pytest.mark.parametrize( + "error", + [TooManyRetriesError, TimeoutError], +) +def test_handles_connection_loss( + logged_in_cli: None, + configured_app: ConfiguredApp, + error: type[Exception], +) -> None: + with ( + changing_dir(configured_app.path), + patch( + "fastapi_cloud_cli.utils.api.APIClient.stream_app_logs", + side_effect=error("Connection lost"), + ), + ): + result = runner.invoke(app, ["logs", "--no-follow"]) + + assert result.exit_code == 1 + assert "Lost connection to log stream" in result.output + + +@pytest.mark.parametrize( + "invalid_since", + [ + "5", # missing unit + "m", # missing number + "5x", # invalid unit + "5min", # invalid unit (should be 'm') + "1hour", # invalid unit (should be 'h') + "5 m", # space not allowed + "-5m", # negative not allowed + "", # empty string + ], +) +def test_rejects_invalid_since_format( + logged_in_cli: None, + configured_app: ConfiguredApp, + invalid_since: str, +) -> None: + with changing_dir(configured_app.path): + result = runner.invoke(app, ["logs", "--since", invalid_since]) + + assert result.exit_code == 2 + assert "Invalid format" in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +@pytest.mark.parametrize( + "valid_since", + [ + "30s", # seconds + "5m", # minutes + "1h", # hours + "2d", # days + "100m", # larger numbers + ], +) +def test_accepts_valid_since_format( + logged_in_cli: None, + respx_mock: respx.MockRouter, + configured_app: ConfiguredApp, + valid_since: str, +) -> None: + route = respx_mock.get( + url__regex=rf"/apps/{configured_app.app_id}/logs/stream.*" + ).mock(return_value=httpx.Response(200, content="")) + + with changing_dir(configured_app.path): + result = runner.invoke(app, ["logs", "--no-follow", "--since", valid_since]) + + assert result.exit_code == 0 + url = str(route.calls[0].request.url).lower() + assert f"since={valid_since}" in url