diff --git a/docs/configuration.md b/docs/configuration.md index 44e1a19..dc0e445 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -175,6 +175,59 @@ Maximum items to show in arrays/objects before truncation. Maximum string length before truncation. +## Compression Support + +The debug toolbar middleware automatically handles compressed HTTP responses. The following compression formats are supported: + +### Supported Formats + +| Format | Encoding Header | Library | Availability | +|--------|----------------|---------|--------------| +| **gzip** | `gzip` | `gzip` (stdlib) | Always available | +| **deflate** | `deflate` | `zlib` (stdlib) | Always available | +| **Brotli** | `br` | `brotli` | Optional (install with `pip install brotli`) | +| **Zstandard** | `zstd` | `zstandard` | Optional (install with `pip install zstandard`) | + +### How It Works + +When the middleware encounters a compressed response: + +1. It detects the compression format from the `Content-Encoding` header +2. Decompresses the response body +3. Injects the toolbar HTML +4. Returns the modified response **uncompressed** with no `Content-Encoding` header + +This ensures the toolbar is correctly injected regardless of whether your application uses compression. + +### Multiple Encodings + +The middleware correctly handles comma-separated encoding values (e.g., `Content-Encoding: gzip, identity`). Encodings are processed in reverse order, as per HTTP specification (last applied encoding is first to be removed). + +### Optional Dependencies + +To enable support for Brotli and Zstandard: + +```bash +# Brotli support +pip install debug-toolbar[litestar] brotli + +# Zstandard support +pip install debug-toolbar[litestar] zstandard + +# Both +pip install debug-toolbar[litestar] brotli zstandard +``` + +If these libraries are not installed, the middleware will gracefully skip decompression for those formats and log a debug message. + +### Error Handling + +The middleware includes robust error handling: + +- **Invalid compressed data**: If data claims to be compressed but isn't valid, it falls back to treating it as uncompressed +- **UTF-8 decode errors**: If decompressed data can't be decoded as UTF-8, the response is returned as-is +- **Missing libraries**: If optional compression libraries aren't installed, those formats are skipped gracefully + ## Environment-Based Configuration Common pattern for different environments: diff --git a/src/debug_toolbar/litestar/middleware.py b/src/debug_toolbar/litestar/middleware.py index ff4658b..a92c714 100644 --- a/src/debug_toolbar/litestar/middleware.py +++ b/src/debug_toolbar/litestar/middleware.py @@ -2,9 +2,11 @@ from __future__ import annotations +import gzip import logging import re import time +import zlib from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Literal, cast from uuid import uuid4 @@ -15,6 +17,17 @@ from debug_toolbar.litestar.panels.events import collect_events_metadata from litestar.middleware import AbstractMiddleware +# Optional compression libraries +try: + import brotli +except ImportError: + brotli = None # type: ignore[assignment] + +try: + import zstandard +except ImportError: + zstandard = None # type: ignore[assignment] + if TYPE_CHECKING: from litestar.types import ( ASGIApp, @@ -190,20 +203,27 @@ async def _handle_response_body( async def _send_html_response(self, send: Send, context: RequestContext, state: ResponseState) -> None: """Process and send buffered HTML response with toolbar injection.""" full_body = b"".join(state.body_chunks) + content_encoding = state.headers.get("content-encoding", "") try: await self.toolbar.process_response(context) - modified_body = self._inject_toolbar(full_body, context) + modified_body, new_encoding = self._inject_toolbar(full_body, context, content_encoding) server_timing = self.toolbar.get_server_timing_header(context) except Exception: logger.debug("Toolbar processing failed, sending original response", exc_info=True) modified_body = full_body + new_encoding = content_encoding server_timing = None + # Build headers, excluding content-length (recalculated) and content-encoding (may have changed) + excluded_headers = {"content-length", "content-encoding"} new_headers: list[tuple[bytes, bytes]] = [ - (k.encode(), v.encode()) for k, v in state.headers.items() if k.lower() != "content-length" + (k.encode(), v.encode()) for k, v in state.headers.items() if k.lower() not in excluded_headers ] new_headers.append((b"content-length", str(len(modified_body)).encode())) + # Only add content-encoding if we still have one (not stripped due to decompression) + if new_encoding: + new_headers.append((b"content-encoding", new_encoding.encode())) if server_timing: new_headers.append((b"server-timing", server_timing.encode())) @@ -471,20 +491,86 @@ def _populate_routes_metadata(self, request: Request, context: RequestContext) - context.metadata["routes"] = [] context.metadata["matched_route"] = "" - def _inject_toolbar(self, body: bytes, context: RequestContext) -> bytes: + def _decompress_body(self, body: bytes, encodings: list[str]) -> tuple[bytes, bool]: # noqa: PLR0912 + """Decompress response body based on content-encoding. + + Args: + body: The compressed response body. + encodings: List of encoding formats (e.g., ["gzip", "deflate"]). + + Returns: + Tuple of (decompressed body, success flag). + Success flag is True if decompression succeeded, False otherwise. + """ + decompressed = False + # Process encodings in reverse order (last applied encoding is first to remove) + for encoding in reversed(encodings): + if encoding == "gzip": + try: + body = gzip.decompress(body) + decompressed = True + except (gzip.BadGzipFile, OSError): + # Not valid gzip, try next encoding or decode as-is + break + elif encoding == "deflate": + try: + body = zlib.decompress(body) + decompressed = True + except zlib.error: + # Not valid deflate, try next encoding or decode as-is + break + elif encoding == "br": + if brotli is not None: + try: + body = brotli.decompress(body) + decompressed = True + except brotli.error: # type: ignore[attr-defined] + # Not valid brotli, try next encoding or decode as-is + break + else: + # Brotli not available, can't decompress + logger.debug("Brotli compression detected but brotli library not installed") + break + elif encoding == "zstd": + if zstandard is not None: + try: + dctx = zstandard.ZstdDecompressor() + body = dctx.decompress(body) + decompressed = True + except zstandard.ZstdError: + # Not valid zstd, try next encoding or decode as-is + break + else: + # Zstandard not available, can't decompress + logger.debug("Zstandard compression detected but zstandard library not installed") + break + return body, decompressed + + def _inject_toolbar(self, body: bytes, context: RequestContext, content_encoding: str = "") -> tuple[bytes, str]: """Inject the toolbar HTML into the response body. Args: - body: The original response body. + body: The original response body (may be compressed). context: The request context with collected data. + content_encoding: The content-encoding header value (e.g., "gzip", "deflate", "br", "zstd"). Returns: - The modified response body with toolbar injected. + Tuple of (modified body, content_encoding to use). + If compression was decompressed, returns uncompressed body with empty encoding. """ + # Handle compressed responses + encodings = [e.strip() for e in content_encoding.lower().split(",")] if content_encoding else [] + body, decompressed = self._decompress_body(body, encodings) + try: html = body.decode("utf-8") except UnicodeDecodeError: - return body + # Can't decode. If we successfully decompressed, return the + # decompressed body with no content-encoding. Otherwise, return + # the body as-is with the original encoding. + if decompressed: + return body, "" + return body, content_encoding toolbar_data = self.toolbar.get_toolbar_data(context) toolbar_html = self._render_toolbar(toolbar_data) @@ -496,7 +582,10 @@ def _inject_toolbar(self, body: bytes, context: RequestContext) -> bytes: pattern = re.compile(re.escape(insert_before), re.IGNORECASE) html = pattern.sub(toolbar_html + insert_before, html, count=1) - return html.encode("utf-8") + # Return body as uncompressed UTF-8 with empty content-encoding. + # This applies to all successful toolbar injections, regardless of whether + # the input was originally compressed (we decompress before processing). + return html.encode("utf-8"), "" def _render_toolbar(self, data: dict[str, Any]) -> str: """Render the toolbar HTML. diff --git a/tests/integration/test_litestar_middleware.py b/tests/integration/test_litestar_middleware.py index 870f59c..c7283ec 100644 --- a/tests/integration/test_litestar_middleware.py +++ b/tests/integration/test_litestar_middleware.py @@ -2,11 +2,14 @@ from __future__ import annotations +import gzip + import pytest +from litestar.status_codes import HTTP_200_OK from litestar.testing import TestClient from debug_toolbar.litestar import DebugToolbarPlugin, LitestarDebugToolbarConfig -from litestar import Litestar, MediaType, get +from litestar import Litestar, MediaType, Response, get @get("/", media_type=MediaType.HTML) @@ -274,7 +277,7 @@ def test_works_with_before_after_request(self) -> None: Note: We only verify before_request hook is called. The after_request hook timing varies in CI environments due to async execution order. """ - from litestar import Request, Response + from litestar import Request hook_state: dict[str, bool] = {"before": False, "after": False} @@ -298,3 +301,342 @@ async def after_request(response: Response) -> Response: assert response.status_code == 200 assert b"debug-toolbar" in response.content assert hook_state["before"], "before_request hook was not called" + + +class TestGzipCompression: + """Test toolbar injection with gzip-compressed responses.""" + + def test_toolbar_injected_with_gzip_compression(self) -> None: + """Test that toolbar is correctly injected when response is gzip-compressed. + + This tests the fix for the issue where gzip-compressed responses would + fail to have the toolbar injected because the middleware couldn't decode + the compressed bytes as UTF-8. + """ + from litestar.config.compression import CompressionConfig + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[html_handler], + plugins=[DebugToolbarPlugin(config)], + compression_config=CompressionConfig(backend="gzip", minimum_size=1), + debug=True, + ) + with TestClient(app) as client: + # Request with Accept-Encoding to trigger compression + response = client.get("/", headers={"Accept-Encoding": "gzip"}) + assert response.status_code == 200 + # At the TestClient level we see an uncompressed body with the toolbar injected. + assert b"debug-toolbar" in response.content + assert b"" in response.content + + def test_toolbar_injected_without_compression(self) -> None: + """Test that toolbar injection still works without compression.""" + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[html_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + response = client.get("/") + assert response.status_code == 200 + assert b"debug-toolbar" in response.content + + def test_invalid_gzip_data_with_gzip_header(self) -> None: + """Test handling of invalid gzip data with content-encoding: gzip header. + + When the response claims to be gzipped but contains invalid gzip data, + the middleware should gracefully fall back to treating it as uncompressed. + """ + + @get("/invalid-gzip", media_type=MediaType.HTML) + async def invalid_gzip_handler() -> Response: + """Return invalid gzip data with gzip content-encoding header.""" + # This is not valid gzip data + invalid_gzip = b"This is not gzipped data but pretends to be" + return Response( + content=invalid_gzip, + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "gzip"}, + ) + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[invalid_gzip_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + response = client.get("/invalid-gzip") + assert response.status_code == 200 + # Should return original content since it couldn't be decompressed + assert b"This is not gzipped data but pretends to be" in response.content + + def test_gzip_decompressed_data_fails_utf8_decoding(self) -> None: + """Test handling of valid gzip data that fails UTF-8 decoding after decompression. + + When gzipped data decompresses successfully but contains non-UTF-8 bytes, + the middleware should return the original compressed data. + """ + + @get("/binary-gzip", media_type=MediaType.HTML) + async def binary_gzip_handler() -> Response: + """Return gzipped binary data that's not valid UTF-8.""" + # Binary data that's not valid UTF-8 + binary_data = b"\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89" + gzipped = gzip.compress(binary_data) + return Response( + content=gzipped, + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "gzip"}, + ) + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[binary_gzip_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + response = client.get("/binary-gzip") + assert response.status_code == 200 + # Should return decompressed binary data since UTF-8 decode failed + # The middleware has removed the gzip encoding, so we check for the raw binary content + assert response.content == b"\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89" + + def test_gzip_header_case_insensitive(self) -> None: + """Test that content-encoding header matching is case-insensitive. + + The HTTP spec requires header names to be case-insensitive, so we should + handle various casings of "gzip" (e.g., "GZIP", "Gzip", "GzIp"). + """ + + @get("/gzip-upper", media_type=MediaType.HTML) + async def gzip_upper_handler() -> Response: + """Return gzipped HTML with uppercase GZIP encoding.""" + html = "

Test

" + gzipped = gzip.compress(html.encode("utf-8")) + return Response( + content=gzipped, + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "GZIP"}, + ) + + @get("/gzip-mixed", media_type=MediaType.HTML) + async def gzip_mixed_handler() -> Response: + """Return gzipped HTML with mixed case GzIp encoding.""" + html = "

Test

" + gzipped = gzip.compress(html.encode("utf-8")) + return Response( + content=gzipped, + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "GzIp"}, + ) + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[gzip_upper_handler, gzip_mixed_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + # Test uppercase GZIP + response = client.get("/gzip-upper") + assert response.status_code == 200 + assert b"debug-toolbar" in response.content + assert b"" in response.content + + # Test mixed case GzIp + response = client.get("/gzip-mixed") + assert response.status_code == 200 + assert b"debug-toolbar" in response.content + assert b"" in response.content + + +class TestMultipleCompressionFormats: + """Test toolbar injection with various compression formats.""" + + def test_toolbar_injected_with_deflate_compression(self) -> None: + """Test that toolbar is correctly injected when response is deflate-compressed.""" + import zlib + + @get("/deflate", media_type=MediaType.HTML) + async def deflate_handler() -> Response: + """Return deflate-compressed HTML.""" + html = "

Test

" + compressed = zlib.compress(html.encode("utf-8")) + return Response( + content=compressed, + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "deflate"}, + ) + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[deflate_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + response = client.get("/deflate") + assert response.status_code == 200 + assert b"debug-toolbar" in response.content + assert b"" in response.content + + def test_toolbar_injected_with_brotli_compression(self) -> None: + """Test that toolbar is correctly injected when response is brotli-compressed.""" + try: + import brotli + except ImportError: + pytest.skip("brotli library not available") + + @get("/brotli", media_type=MediaType.HTML) + async def brotli_handler() -> Response: + """Return brotli-compressed HTML.""" + html = "

Test

" + compressed = brotli.compress(html.encode("utf-8")) + return Response( + content=compressed, + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "br"}, + ) + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[brotli_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + response = client.get("/brotli") + assert response.status_code == 200 + assert b"debug-toolbar" in response.content + assert b"" in response.content + + def test_toolbar_injected_with_zstd_compression(self) -> None: + """Test that toolbar is correctly injected when response is zstd-compressed.""" + try: + import zstandard + except ImportError: + pytest.skip("zstandard library not available") + + @get("/zstd", media_type=MediaType.HTML) + async def zstd_handler() -> Response: + """Return zstd-compressed HTML.""" + html = "

Test

" + cctx = zstandard.ZstdCompressor() + compressed = cctx.compress(html.encode("utf-8")) + return Response( + content=compressed, + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "zstd"}, + ) + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[zstd_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + response = client.get("/zstd") + assert response.status_code == 200 + assert b"debug-toolbar" in response.content + assert b"" in response.content + + def test_toolbar_with_multiple_encodings(self) -> None: + """Test handling of multiple comma-separated encodings.""" + + @get("/multi-encoding", media_type=MediaType.HTML) + async def multi_encoding_handler() -> Response: + """Return HTML with multiple encodings applied.""" + html = "

Test

" + # Apply gzip first + compressed = gzip.compress(html.encode("utf-8")) + return Response( + content=compressed, + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "gzip, identity"}, + ) + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[multi_encoding_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + response = client.get("/multi-encoding") + assert response.status_code == 200 + assert b"debug-toolbar" in response.content + assert b"" in response.content + + def test_invalid_deflate_data_with_deflate_header(self) -> None: + """Test handling of invalid deflate data with content-encoding: deflate header.""" + + @get("/invalid-deflate", media_type=MediaType.HTML) + async def invalid_deflate_handler() -> Response: + """Return invalid deflate data with deflate content-encoding header.""" + invalid_deflate = b"This is not deflated data but pretends to be" + return Response( + content=invalid_deflate, + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "deflate"}, + ) + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[invalid_deflate_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + response = client.get("/invalid-deflate") + assert response.status_code == 200 + # Should return original content since it couldn't be decompressed + assert b"This is not deflated data but pretends to be" in response.content + + def test_brotli_without_library(self) -> None: + """Test that responses with brotli encoding are handled gracefully without the library.""" + try: + import brotli # noqa: F401 + + pytest.skip("brotli library is available, can't test missing library case") + except ImportError: + pass + + @get("/brotli-missing", media_type=MediaType.HTML) + async def brotli_missing_handler() -> Response: + """Return content with br encoding when library is not available.""" + # Since brotli isn't available, we can't actually compress it + # This simulates a response claiming to be brotli-compressed + html = "

Test

" + return Response( + content=html.encode("utf-8"), + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "br"}, + ) + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[brotli_missing_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + response = client.get("/brotli-missing") + assert response.status_code == 200 + # Should handle gracefully and attempt to inject toolbar + # (will work since the data isn't actually compressed) + assert b"Test" in response.content