From 3f1fdf6f428c5289d8ac780700e9a59bf0a7fa2a Mon Sep 17 00:00:00 2001 From: Andrej Simurka Date: Mon, 19 Jan 2026 12:17:41 +0100 Subject: [PATCH 1/2] Added RAG chunks extraction to v2/streaming_query --- src/app/endpoints/query_v2.py | 51 ++++++++++++++++--------- src/app/endpoints/streaming_query_v2.py | 11 ++++-- src/utils/endpoints.py | 2 +- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/src/app/endpoints/query_v2.py b/src/app/endpoints/query_v2.py index 1e5abc45..1ab4442d 100644 --- a/src/app/endpoints/query_v2.py +++ b/src/app/endpoints/query_v2.py @@ -83,13 +83,16 @@ def _build_tool_call_summary( # pylint: disable=too-many-return-statements,too-many-branches output_item: OpenAIResponseOutput, + rag_chunks: list[RAGChunk], ) -> tuple[Optional[ToolCallSummary], Optional[ToolResultSummary]]: """Translate Responses API tool outputs into ToolCallSummary and ToolResultSummary records. Processes OpenAI response output items and extracts tool call and result information. + Also parses RAG chunks from file_search_call items and appends them to the provided list. Args: output_item: An OpenAIResponseOutput item from the response.output array + rag_chunks: List to append extracted RAG chunks to (from file_search_call items) Returns: A tuple of (ToolCallSummary, ToolResultSummary) one of them possibly None @@ -97,7 +100,7 @@ def _build_tool_call_summary( # pylint: disable=too-many-return-statements,too- Supported tool types: - function_call: Function tool calls with parsed arguments (no result) - - file_search_call: File search operations with results + - file_search_call: File search operations with results (also extracts RAG chunks) - web_search_call: Web search operations (incomplete) - mcp_call: MCP calls with server labels - mcp_list_tools: MCP server tool listings @@ -120,6 +123,7 @@ def _build_tool_call_summary( # pylint: disable=too-many-return-statements,too- if item_type == "file_search_call": item = cast(OpenAIResponseOutputMessageFileSearchToolCall, output_item) + extract_rag_chunks_from_file_search_item(item, rag_chunks) response_payload: Optional[dict[str, Any]] = None if item.results is not None: response_payload = { @@ -431,12 +435,13 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche llm_response = "" tool_calls: list[ToolCallSummary] = [] tool_results: list[ToolResultSummary] = [] + rag_chunks: list[RAGChunk] = [] for output_item in response.output: message_text = extract_text_from_response_output_item(output_item) if message_text: llm_response += message_text - tool_call, tool_result = _build_tool_call_summary(output_item) + tool_call, tool_result = _build_tool_call_summary(output_item, rag_chunks) if tool_call: tool_calls.append(tool_call) if tool_result: @@ -448,9 +453,6 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche len(llm_response), ) - # Extract rag chunks - rag_chunks = parse_rag_chunks_from_responses_api(response) - summary = TurnSummary( llm_response=llm_response, tool_calls=tool_calls, @@ -479,7 +481,27 @@ async def retrieve_response( # pylint: disable=too-many-locals,too-many-branche ) -def parse_rag_chunks_from_responses_api(response_obj: Any) -> list[RAGChunk]: +def extract_rag_chunks_from_file_search_item( + item: OpenAIResponseOutputMessageFileSearchToolCall, + rag_chunks: list[RAGChunk], +) -> None: + """Extract RAG chunks from a file search tool call item and append to rag_chunks. + + Args: + item: The file search tool call item. + rag_chunks: List to append extracted RAG chunks to. + """ + if item.results is not None: + for result in item.results: + rag_chunk = RAGChunk( + content=result.text, source="file_search", score=result.score + ) + rag_chunks.append(rag_chunk) + + +def parse_rag_chunks_from_responses_api( + response_obj: OpenAIResponseObject, +) -> list[RAGChunk]: """ Extract rag_chunks from the llama-stack OpenAI response. @@ -489,20 +511,13 @@ def parse_rag_chunks_from_responses_api(response_obj: Any) -> list[RAGChunk]: Returns: List of RAGChunk with content, source, score """ - rag_chunks = [] + rag_chunks: list[RAGChunk] = [] for output_item in response_obj.output: - if ( - hasattr(output_item, "type") - and output_item.type == "file_search_call" - and hasattr(output_item, "results") - ): - - for result in output_item.results: - rag_chunk = RAGChunk( - content=result.text, source="file_search", score=result.score - ) - rag_chunks.append(rag_chunk) + item_type = getattr(output_item, "type", None) + if item_type == "file_search_call": + item = cast(OpenAIResponseOutputMessageFileSearchToolCall, output_item) + extract_rag_chunks_from_file_search_item(item, rag_chunks) return rag_chunks diff --git a/src/app/endpoints/streaming_query_v2.py b/src/app/endpoints/streaming_query_v2.py index c947b208..bdeccb0e 100644 --- a/src/app/endpoints/streaming_query_v2.py +++ b/src/app/endpoints/streaming_query_v2.py @@ -70,7 +70,7 @@ ) from utils.token_counter import TokenCounter from utils.transcripts import store_transcript -from utils.types import TurnSummary +from utils.types import RAGChunk, TurnSummary logger = logging.getLogger("app.endpoints.handlers") router = APIRouter(tags=["streaming_query_v1"]) @@ -143,6 +143,9 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat # Track the latest response object from response.completed event latest_response_object: Optional[Any] = None + # RAG chunks + rag_chunks: list[RAGChunk] = [] + logger.debug("Starting streaming response (Responses API) processing") async for chunk in turn_response: @@ -198,7 +201,9 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat ) if done_chunk.item.type == "message": continue - tool_call, tool_result = _build_tool_call_summary(done_chunk.item) + tool_call, tool_result = _build_tool_call_summary( + done_chunk.item, rag_chunks + ) if tool_call: summary.tool_calls.append(tool_call) yield stream_event( @@ -321,7 +326,7 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat is_transcripts_enabled_func=is_transcripts_enabled, store_transcript_func=store_transcript, persist_user_conversation_details_func=persist_user_conversation_details, - rag_chunks=[], # Responses API uses empty list for rag_chunks + rag_chunks=[rag_chunk.model_dump() for rag_chunk in rag_chunks], ) return response_generator diff --git a/src/utils/endpoints.py b/src/utils/endpoints.py index 0db9d503..b0b49917 100644 --- a/src/utils/endpoints.py +++ b/src/utils/endpoints.py @@ -747,7 +747,7 @@ async def cleanup_after_streaming( is_transcripts_enabled_func: Function to check if transcripts are enabled store_transcript_func: Function to store transcript persist_user_conversation_details_func: Function to persist conversation details - rag_chunks: Optional RAG chunks dict (for Agent API, None for Responses API) + rag_chunks: Optional RAG chunks dict """ # Store transcript if enabled if not is_transcripts_enabled_func(): From d7d11c709f72a428552c69efd0345bf158994ae7 Mon Sep 17 00:00:00 2001 From: Andrej Simurka Date: Mon, 19 Jan 2026 12:31:39 +0100 Subject: [PATCH 2/2] Reformated by black --- dev-tools/mcp-mock-server/server.py | 1 - src/client.py | 1 - src/quota/quota_limiter.py | 1 - src/utils/mcp_headers.py | 1 - .../e2e/features/steps/llm_query_response.py | 1 - tests/unit/app/endpoints/test_a2a.py | 1 - .../app/endpoints/test_streaming_query.py | 1 - tests/unit/cache/test_postgres_cache.py | 1 - tests/unit/models/rlsapi/test_requests.py | 1 - tests/unit/models/rlsapi/test_responses.py | 1 - tests/unit/test_configuration.py | 60 +++++++------------ 11 files changed, 20 insertions(+), 50 deletions(-) diff --git a/dev-tools/mcp-mock-server/server.py b/dev-tools/mcp-mock-server/server.py index c2604e18..b7e17fff 100644 --- a/dev-tools/mcp-mock-server/server.py +++ b/dev-tools/mcp-mock-server/server.py @@ -25,7 +25,6 @@ from pathlib import Path from typing import Any - # Global storage for captured headers (last request) last_headers: dict[str, str] = {} request_log: list = [] diff --git a/src/client.py b/src/client.py index fd7e3d1d..95020ef5 100644 --- a/src/client.py +++ b/src/client.py @@ -11,7 +11,6 @@ from models.config import LlamaStackConfiguration from utils.types import Singleton - logger = logging.getLogger(__name__) diff --git a/src/quota/quota_limiter.py b/src/quota/quota_limiter.py index f4f9d006..79432f3b 100644 --- a/src/quota/quota_limiter.py +++ b/src/quota/quota_limiter.py @@ -42,7 +42,6 @@ from quota.connect_pg import connect_pg from quota.connect_sqlite import connect_sqlite - logger = get_logger(__name__) diff --git a/src/utils/mcp_headers.py b/src/utils/mcp_headers.py index 46bd0d5c..c0f8d9d5 100644 --- a/src/utils/mcp_headers.py +++ b/src/utils/mcp_headers.py @@ -8,7 +8,6 @@ from configuration import AppConfig - logger = logging.getLogger("app.endpoints.dependencies") diff --git a/tests/e2e/features/steps/llm_query_response.py b/tests/e2e/features/steps/llm_query_response.py index 14580aa6..732f6e29 100644 --- a/tests/e2e/features/steps/llm_query_response.py +++ b/tests/e2e/features/steps/llm_query_response.py @@ -6,7 +6,6 @@ from behave.runner import Context from tests.e2e.utils.utils import replace_placeholders - DEFAULT_LLM_TIMEOUT = 60 diff --git a/tests/unit/app/endpoints/test_a2a.py b/tests/unit/app/endpoints/test_a2a.py index 9a7dd67a..a6ac3db2 100644 --- a/tests/unit/app/endpoints/test_a2a.py +++ b/tests/unit/app/endpoints/test_a2a.py @@ -39,7 +39,6 @@ from configuration import AppConfig from models.config import Action - # User ID must be proper UUID MOCK_AUTH = ( "00000001-0001-0001-0001-000000000001", diff --git a/tests/unit/app/endpoints/test_streaming_query.py b/tests/unit/app/endpoints/test_streaming_query.py index 304c43cc..aad26903 100644 --- a/tests/unit/app/endpoints/test_streaming_query.py +++ b/tests/unit/app/endpoints/test_streaming_query.py @@ -43,7 +43,6 @@ from tests.unit.utils.auth_helpers import mock_authorization_resolvers from utils.token_counter import TokenCounter - # Note: content_delta module doesn't exist in llama-stack-client 0.3.x # These are mock classes for backward compatibility with Agent API tests # pylint: disable=too-few-public-methods,redefined-builtin diff --git a/tests/unit/cache/test_postgres_cache.py b/tests/unit/cache/test_postgres_cache.py index b720de61..2855c9da 100644 --- a/tests/unit/cache/test_postgres_cache.py +++ b/tests/unit/cache/test_postgres_cache.py @@ -18,7 +18,6 @@ from cache.cache_error import CacheError from cache.postgres_cache import PostgresCache - USER_ID_1 = suid.get_suid() USER_ID_2 = suid.get_suid() CONVERSATION_ID_1 = suid.get_suid() diff --git a/tests/unit/models/rlsapi/test_requests.py b/tests/unit/models/rlsapi/test_requests.py index a3218d4a..0b72a1a5 100644 --- a/tests/unit/models/rlsapi/test_requests.py +++ b/tests/unit/models/rlsapi/test_requests.py @@ -15,7 +15,6 @@ RlsapiV1Terminal, ) - # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- diff --git a/tests/unit/models/rlsapi/test_responses.py b/tests/unit/models/rlsapi/test_responses.py index 4fe72dc7..1de1886e 100644 --- a/tests/unit/models/rlsapi/test_responses.py +++ b/tests/unit/models/rlsapi/test_responses.py @@ -12,7 +12,6 @@ ) from models.responses import AbstractSuccessfulResponse - # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- diff --git a/tests/unit/test_configuration.py b/tests/unit/test_configuration.py index b2083a03..019a2ba0 100644 --- a/tests/unit/test_configuration.py +++ b/tests/unit/test_configuration.py @@ -250,8 +250,7 @@ def test_load_proper_configuration(tmpdir: Path) -> None: """Test loading proper configuration from YAML file.""" cfg_filename = tmpdir / "config.yaml" with open(cfg_filename, "w", encoding="utf-8") as fout: - fout.write( - """ + fout.write(""" name: foo bar baz service: host: localhost @@ -267,8 +266,7 @@ def test_load_proper_configuration(tmpdir: Path) -> None: user_data_collection: feedback_enabled: false mcp_servers: [] - """ - ) + """) cfg = AppConfig() cfg.load_configuration(str(cfg_filename)) @@ -282,8 +280,7 @@ def test_load_configuration_with_mcp_servers(tmpdir: Path) -> None: """Test loading configuration from YAML file with MCP servers.""" cfg_filename = tmpdir / "config.yaml" with open(cfg_filename, "w", encoding="utf-8") as fout: - fout.write( - """ + fout.write(""" name: test service service: host: localhost @@ -304,8 +301,7 @@ def test_load_configuration_with_mcp_servers(tmpdir: Path) -> None: - name: git-server provider_id: custom-git-provider url: https://git.example.com/mcp - """ - ) + """) cfg = AppConfig() cfg.load_configuration(str(cfg_filename)) @@ -446,8 +442,7 @@ def test_load_configuration_with_customization_system_prompt_path(tmpdir: Path) cfg_filename = tmpdir / "config.yaml" with open(cfg_filename, "w", encoding="utf-8") as fout: - fout.write( - f""" + fout.write(f""" name: test service service: host: localhost @@ -471,8 +466,7 @@ def test_load_configuration_with_customization_system_prompt_path(tmpdir: Path) customization: disable_query_system_prompt: true system_prompt_path: {system_prompt_filename} - """ - ) + """) cfg = AppConfig() cfg.load_configuration(str(cfg_filename)) @@ -486,8 +480,7 @@ def test_load_configuration_with_customization_system_prompt(tmpdir: Path) -> No """Test loading configuration from YAML file with system_prompt in the customization.""" cfg_filename = tmpdir / "config.yaml" with open(cfg_filename, "w", encoding="utf-8") as fout: - fout.write( - """ + fout.write(""" name: test service service: host: localhost @@ -511,8 +504,7 @@ def test_load_configuration_with_customization_system_prompt(tmpdir: Path) -> No customization: system_prompt: |- this is system prompt in the customization section - """ - ) + """) cfg = AppConfig() cfg.load_configuration(str(cfg_filename)) @@ -531,8 +523,7 @@ def test_configuration_with_profile_customization(tmpdir: Path) -> None: expected_prompts = expected_profile.get_prompts() cfg_filename = tmpdir / "config.yaml" with open(cfg_filename, "w", encoding="utf-8") as fout: - fout.write( - """ + fout.write(""" name: test service service: host: localhost @@ -549,8 +540,7 @@ def test_configuration_with_profile_customization(tmpdir: Path) -> None: feedback_enabled: false customization: profile_path: tests/profiles/test/profile.py - """ - ) + """) cfg = AppConfig() cfg.load_configuration(str(cfg_filename)) @@ -574,8 +564,7 @@ def test_configuration_with_all_customizations(tmpdir: Path) -> None: cfg_filename = tmpdir / "config.yaml" with open(cfg_filename, "w", encoding="utf-8") as fout: - fout.write( - f""" + fout.write(f""" name: test service service: host: localhost @@ -594,8 +583,7 @@ def test_configuration_with_all_customizations(tmpdir: Path) -> None: profile_path: tests/profiles/test/profile.py system_prompt: custom prompt system_prompt_path: {system_prompt_filename} - """ - ) + """) cfg = AppConfig() cfg.load_configuration(str(cfg_filename)) @@ -613,8 +601,7 @@ def test_configuration_with_sqlite_conversation_cache(tmpdir: Path) -> None: """Test loading configuration from YAML file with conversation cache configuration.""" cfg_filename = tmpdir / "config.yaml" with open(cfg_filename, "w", encoding="utf-8") as fout: - fout.write( - """ + fout.write(""" name: test service service: host: localhost @@ -633,8 +620,7 @@ def test_configuration_with_sqlite_conversation_cache(tmpdir: Path) -> None: type: "sqlite" sqlite: db_path: ":memory:" - """ - ) + """) cfg = AppConfig() cfg.load_configuration(str(cfg_filename)) @@ -652,8 +638,7 @@ def test_configuration_with_in_memory_conversation_cache(tmpdir: Path) -> None: """Test loading configuration from YAML file with conversation cache configuration.""" cfg_filename = tmpdir / "config.yaml" with open(cfg_filename, "w", encoding="utf-8") as fout: - fout.write( - """ + fout.write(""" name: test service service: host: localhost @@ -672,8 +657,7 @@ def test_configuration_with_in_memory_conversation_cache(tmpdir: Path) -> None: type: "memory" memory: max_entries: 42 - """ - ) + """) cfg = AppConfig() cfg.load_configuration(str(cfg_filename)) @@ -691,8 +675,7 @@ def test_configuration_with_quota_handlers_no_storage(tmpdir: Path) -> None: """Test loading configuration from YAML file with quota handlers configuration.""" cfg_filename = tmpdir / "config.yaml" with open(cfg_filename, "w", encoding="utf-8") as fout: - fout.write( - """ + fout.write(""" name: test service service: host: localhost @@ -722,8 +705,7 @@ def test_configuration_with_quota_handlers_no_storage(tmpdir: Path) -> None: scheduler: # scheduler ticks in seconds period: 1 - """ - ) + """) cfg = AppConfig() cfg.load_configuration(str(cfg_filename)) @@ -745,8 +727,7 @@ def test_configuration_with_quota_handlers(tmpdir: Path) -> None: """Test loading configuration from YAML file with quota handlers configuration.""" cfg_filename = tmpdir / "config.yaml" with open(cfg_filename, "w", encoding="utf-8") as fout: - fout.write( - """ + fout.write(""" name: test service service: host: localhost @@ -778,8 +759,7 @@ def test_configuration_with_quota_handlers(tmpdir: Path) -> None: scheduler: # scheduler ticks in seconds period: 1 - """ - ) + """) cfg = AppConfig() cfg.load_configuration(str(cfg_filename))