diff --git a/src/proxy_app/main.py b/src/proxy_app/main.py index 2eeb27f0..46aae2c4 100644 --- a/src/proxy_app/main.py +++ b/src/proxy_app/main.py @@ -137,6 +137,7 @@ from rotator_library.model_info_service import init_model_info_service from proxy_app.request_logger import log_request_to_console, redact_sensitive_data from proxy_app.security_config import get_cors_settings, validate_secret_settings + from proxy_app.stream_usage import StreamUsageTracker from proxy_app.batch_manager import EmbeddingBatcher from proxy_app.api_token_auth import ApiActor, get_api_actor, require_admin_api_actor from proxy_app.detailed_logger import RawIOLogger @@ -803,12 +804,11 @@ async def streaming_response_wrapper( Wraps a streaming response to log the full response after completion and ensures any errors during the stream are sent to the client. """ - response_chunks = [] - full_response = {} - usage_data = None + tracker = StreamUsageTracker(model=request_data.get("model")) + full_response = tracker.build_logging_payload() status_code = 200 stream_error: Exception | None = None - model = request_data.get("model") + model = tracker.model try: async for chunk_str in response_stream: @@ -821,7 +821,8 @@ async def streaming_response_wrapper( if content != "[DONE]": try: chunk_data = json.loads(content) - response_chunks.append(chunk_data) + if isinstance(chunk_data, dict): + tracker.ingest_chunk(chunk_data) if logger: logger.log_stream_chunk(chunk_data) except json.JSONDecodeError: @@ -848,123 +849,10 @@ async def streaming_response_wrapper( ) return # Stop further processing finally: - if response_chunks: - # --- Aggregation Logic --- - final_message = {"role": "assistant"} - aggregated_tool_calls = {} - usage_data = None - finish_reason = None - - for chunk in response_chunks: - if "choices" in chunk and chunk["choices"]: - choice = chunk["choices"][0] - delta = choice.get("delta", {}) - - # Dynamically aggregate all fields from the delta - for key, value in delta.items(): - if value is None: - continue - - if key == "content": - if "content" not in final_message: - final_message["content"] = "" - if value: - final_message["content"] += value - - elif key == "tool_calls": - for tc_chunk in value: - index = tc_chunk["index"] - if index not in aggregated_tool_calls: - aggregated_tool_calls[index] = { - "type": "function", - "function": {"name": "", "arguments": ""}, - } - # Ensure 'function' key exists for this index before accessing its sub-keys - if "function" not in aggregated_tool_calls[index]: - aggregated_tool_calls[index]["function"] = { - "name": "", - "arguments": "", - } - if tc_chunk.get("id"): - aggregated_tool_calls[index]["id"] = tc_chunk["id"] - if "function" in tc_chunk: - if "name" in tc_chunk["function"]: - if tc_chunk["function"]["name"] is not None: - aggregated_tool_calls[index]["function"][ - "name" - ] += tc_chunk["function"]["name"] - if "arguments" in tc_chunk["function"]: - if ( - tc_chunk["function"]["arguments"] - is not None - ): - aggregated_tool_calls[index]["function"][ - "arguments" - ] += tc_chunk["function"]["arguments"] - - elif key == "function_call": - if "function_call" not in final_message: - final_message["function_call"] = { - "name": "", - "arguments": "", - } - if "name" in value: - if value["name"] is not None: - final_message["function_call"]["name"] += value[ - "name" - ] - if "arguments" in value: - if value["arguments"] is not None: - final_message["function_call"]["arguments"] += ( - value["arguments"] - ) - - else: # Generic key handling for other data like 'reasoning' - # FIX: Role should always replace, never concatenate - if key == "role": - final_message[key] = value - elif key not in final_message: - final_message[key] = value - elif isinstance(final_message.get(key), str): - final_message[key] += value - else: - final_message[key] = value - - if "finish_reason" in choice and choice["finish_reason"]: - finish_reason = choice["finish_reason"] - - if "usage" in chunk and chunk["usage"]: - usage_data = chunk["usage"] - - # --- Final Response Construction --- - if aggregated_tool_calls: - final_message["tool_calls"] = list(aggregated_tool_calls.values()) - # CRITICAL FIX: Override finish_reason when tool_calls exist - # This ensures OpenCode and other agentic systems continue the conversation loop - finish_reason = "tool_calls" - - # Ensure standard fields are present for consistent logging - for field in ["content", "tool_calls", "function_call"]: - if field not in final_message: - final_message[field] = None - - first_chunk = response_chunks[0] - final_choice = { - "index": 0, - "message": final_message, - "finish_reason": finish_reason, - } - - full_response = { - "id": first_chunk.get("id"), - "object": "chat.completion", - "created": first_chunk.get("created"), - "model": first_chunk.get("model"), - "choices": [final_choice], - "usage": usage_data, - } - model = full_response.get("model") or model - request_id = _resolve_request_id(request, full_response.get("id") or request_id) + full_response = tracker.build_logging_payload() + usage_data = tracker.usage + model = tracker.model or model + request_id = _resolve_request_id(request, tracker.response_id or request_id) if logger: logger.log_final_response( diff --git a/src/proxy_app/routers/ui.py b/src/proxy_app/routers/ui.py index c4b50557..00e890be 100644 --- a/src/proxy_app/routers/ui.py +++ b/src/proxy_app/routers/ui.py @@ -29,6 +29,7 @@ from proxy_app.db import hash_password from proxy_app.db_models import ApiKey, User from proxy_app.usage_queries import ( + fetch_api_key_last_used_map, fetch_usage_by_day, fetch_usage_by_model, fetch_usage_summary, @@ -96,6 +97,14 @@ async def _load_me_context( select(ApiKey).where(ApiKey.user_id == user_id).order_by(ApiKey.created_at.desc()) ) api_keys = list(rows) + derived_last_used = await fetch_api_key_last_used_map( + session, + user_id=user_id, + api_key_ids=[key.id for key in api_keys], + ) + for key in api_keys: + key.last_used_at = derived_last_used.get(key.id, key.last_used_at) + usage_summary = await fetch_usage_summary(session, user_id=user_id) usage_by_day = await fetch_usage_by_day(session, user_id=user_id, days=days) return { diff --git a/src/proxy_app/routers/user_api.py b/src/proxy_app/routers/user_api.py index a435a891..a7a1df1e 100644 --- a/src/proxy_app/routers/user_api.py +++ b/src/proxy_app/routers/user_api.py @@ -10,6 +10,7 @@ from proxy_app.api_token_auth import hash_api_token from proxy_app.db_models import ApiKey, User from proxy_app.usage_queries import ( + fetch_api_key_last_used_map, fetch_usage_by_day, fetch_usage_by_model, fetch_usage_summary, @@ -105,11 +106,19 @@ async def list_my_api_keys( current_user: SessionUser = Depends(require_user), session: AsyncSession = Depends(get_db_session), ) -> ApiKeyListResponse: - rows = await session.scalars( + rows = list( + await session.scalars( select(ApiKey) .where(ApiKey.user_id == current_user.id) .order_by(ApiKey.created_at.desc()) + ) + ) + derived_last_used = await fetch_api_key_last_used_map( + session, + user_id=current_user.id, + api_key_ids=[row.id for row in rows], ) + return ApiKeyListResponse( api_keys=[ ApiKeyItem( @@ -117,7 +126,7 @@ async def list_my_api_keys( name=row.name, token_prefix=row.token_prefix, created_at=row.created_at, - last_used_at=row.last_used_at, + last_used_at=derived_last_used.get(row.id, row.last_used_at), revoked_at=row.revoked_at, expires_at=row.expires_at, ) diff --git a/src/proxy_app/stream_usage.py b/src/proxy_app/stream_usage.py new file mode 100644 index 00000000..83c16a14 --- /dev/null +++ b/src/proxy_app/stream_usage.py @@ -0,0 +1,40 @@ +from dataclasses import dataclass +from typing import Any + + +@dataclass +class StreamUsageTracker: + response_id: str | None = None + model: str | None = None + created: int | None = None + usage: dict[str, Any] | None = None + + def ingest_chunk(self, chunk_data: dict[str, Any]) -> None: + if self.response_id is None: + response_id = chunk_data.get("id") + if isinstance(response_id, str): + self.response_id = response_id + + if self.model is None: + model = chunk_data.get("model") + if isinstance(model, str): + self.model = model + + if self.created is None: + created = chunk_data.get("created") + if isinstance(created, int): + self.created = created + + usage = chunk_data.get("usage") + if isinstance(usage, dict) and usage: + self.usage = usage + + def build_logging_payload(self) -> dict[str, Any]: + return { + "id": self.response_id, + "object": "chat.completion", + "created": self.created, + "model": self.model, + "choices": [], + "usage": self.usage, + } diff --git a/src/proxy_app/templates/me.html b/src/proxy_app/templates/me.html index 38238fcc..ada6c8f9 100644 --- a/src/proxy_app/templates/me.html +++ b/src/proxy_app/templates/me.html @@ -40,6 +40,7 @@
{{ key.token_prefix }}