From a77a60f5dd45b3fa3371f874958f349425e54e38 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Fri, 16 Jan 2026 17:56:15 -0500 Subject: [PATCH 01/10] WIp trying to make it work --- .gitignore | 1 + .../agent_evals/report_generation/README.md | 11 + .../report_generation/async_client_manager.py | 110 +++++++++ .../agent_evals/report_generation/main.py | 99 +++++++++ .../agent_evals/report_generation/utils.py | 210 ++++++++++++++++++ .../agent_evals/report_generation/weaviate.py | 210 ++++++++++++++++++ 6 files changed, 641 insertions(+) create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/README.md create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/main.py create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/utils.py create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/weaviate.py diff --git a/.gitignore b/.gitignore index ad52a41..c716ed5 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ wheels/ **.ipynb_checkpoints .env +.gradio diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/README.md b/aieng-eval-agents/aieng/agent_evals/report_generation/README.md new file mode 100644 index 0000000..aa66aaa --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/README.md @@ -0,0 +1,11 @@ +# Report Generation Agent + +## Dataset + +https://www.kaggle.com/datasets/yasserh/instacart-online-grocery-basket-analysis-dataset?resource=download + +## Running + +```bash +uv run --env-file .env python -m aieng.agent_evals.report_generation.main +`` diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py b/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py new file mode 100644 index 0000000..911c838 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py @@ -0,0 +1,110 @@ +"""Async client lifecycle manager for Gradio applications. + +Provides idempotent initialization and proper cleanup of async clients +like Weaviate and OpenAI to prevent event loop conflicts during Gradio's +hot-reload process. +""" + +from aieng.agent_evals.report_generation.utils import Configs +from aieng.agent_evals.report_generation.weaviate import ( + AsyncWeaviateKnowledgeBase, + get_weaviate_async_client, +) +from openai import AsyncOpenAI +from weaviate.client import WeaviateAsyncClient + + +class AsyncClientManager: + """Manages async client lifecycle with lazy initialization and cleanup. + + This class ensures clients are created only once and properly closed, + preventing ResourceWarning errors from unclosed event loops. + + Parameters + ---------- + configs: Configs | None, optional, default=None + Configuration object for client setup. If None, a new ``Configs()`` is created. + + Examples + -------- + >>> manager = AsyncClientManager() + >>> # Access clients (created on first access) + >>> weaviate = manager.weaviate_client + >>> kb = manager.knowledgebase + >>> openai = manager.openai_client + >>> # In finally block or cleanup + >>> await manager.close() + """ + + _singleton_instance: "AsyncClientManager | None" = None + + @classmethod + def get_instance(cls) -> "AsyncClientManager": + """Get the singleton instance of the client manager. + + Returns + ------- + The singleton instance of the client manager. + """ + if cls._singleton_instance is None: + cls._singleton_instance = AsyncClientManager() + return cls._singleton_instance + + def __init__(self, configs: Configs | None = None) -> None: + """Initialize manager with optional configs.""" + self._configs: Configs | None = configs + self._weaviate_client: WeaviateAsyncClient | None = None + self._openai_client: AsyncOpenAI | None = None + self._knowledgebase: AsyncWeaviateKnowledgeBase | None = None + self._initialized: bool = False + + @property + def configs(self) -> Configs: + """Get or create configs instance.""" + if self._configs is None: + self._configs = Configs() # pyright: ignore[reportCallIssue] + return self._configs + + @property + def openai_client(self) -> AsyncOpenAI: + """Get or create OpenAI client.""" + if self._openai_client is None: + self._openai_client = AsyncOpenAI() + self._initialized = True + return self._openai_client + + @property + def weaviate_client(self) -> WeaviateAsyncClient: + """Get or create Weaviate client.""" + if self._weaviate_client is None: + self._weaviate_client = get_weaviate_async_client(self.configs) + self._initialized = True + return self._weaviate_client + + @property + def knowledgebase(self) -> AsyncWeaviateKnowledgeBase: + """Get or create knowledge base instance.""" + if self._knowledgebase is None: + self._knowledgebase = AsyncWeaviateKnowledgeBase( + self.weaviate_client, + collection_name=self.configs.weaviate_collection_name, + ) + self._initialized = True + return self._knowledgebase + + async def close(self) -> None: + """Close all initialized async clients.""" + if self._weaviate_client is not None: + await self._weaviate_client.close() + self._weaviate_client = None + + if self._openai_client is not None: + await self._openai_client.close() + self._openai_client = None + + self._knowledgebase = None + self._initialized = False + + def is_initialized(self) -> bool: + """Check if any clients have been initialized.""" + return self._initialized diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/main.py b/aieng-eval-agents/aieng/agent_evals/report_generation/main.py new file mode 100644 index 0000000..57be812 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/main.py @@ -0,0 +1,99 @@ +"""Reason-and-Act Knowledge Retrieval Agent via the OpenAI Agent SDK.""" + +import asyncio +import logging +from typing import Any, AsyncGenerator + +import agents +import gradio as gr +from aieng.agent_evals.report_generation.async_client_manager import AsyncClientManager +from aieng.agent_evals.report_generation.utils import ( + get_or_create_session, + oai_agent_stream_to_gradio_messages, +) +from dotenv import load_dotenv +from gradio.components.chatbot import ChatMessage + + +REACT_INSTRUCTIONS = """\ +Perform the task using the search tool. \ +EACH TIME before invoking the function, you must explain your reasons for doing so. \ +Be sure to mention the sources in your response. \ +If the search tool did not return intended results, try again. \ +For best performance, divide complex queries into simpler sub-queries. \ +Do not make up information. \ +For facts that might change over time, you must use the search tool to retrieve the \ +most up-to-date information. +""" + + +async def _main( + query: str, + history: list[ChatMessage], + session_state: dict[str, Any], +) -> AsyncGenerator[list[ChatMessage], Any]: + # Initialize list of chat messages for a single turn + turn_messages: list[ChatMessage] = [] + + # Construct an in-memory SQLite session for the agent to maintain + # conversation history across multiple turns of a chat + # This makes it possible to ask follow-up questions that refer to + # previous turns in the conversation + session = get_or_create_session(history, session_state) + + # Get the client manager singleton instance + client_manager = AsyncClientManager.get_instance() + + # Define an agent using the OpenAI Agent SDK + main_agent = agents.Agent( + name="Report Generation Agent", # Agent name for logging and debugging purposes + instructions=REACT_INSTRUCTIONS, # System instructions for the agent + # Tools available to the agent + # We wrap the `search_knowledgebase` method with `function_tool`, which + # will construct the tool definition JSON schema by extracting the necessary + # information from the method signature and docstring. + tools=[agents.function_tool(client_manager.knowledgebase.search_knowledgebase)], + model=agents.OpenAIChatCompletionsModel( + model=client_manager.configs.default_worker_model, + openai_client=client_manager.openai_client, + ), + ) + + # Run the agent in streaming mode to get and display intermediate outputs + result_stream = agents.Runner.run_streamed(main_agent, input=query, session=session) + + async for _item in result_stream.stream_events(): + # Parse the stream events, convert to Gradio chat messages and append to + # the chat history + turn_messages += oai_agent_stream_to_gradio_messages(_item) + if len(turn_messages) > 0: + yield turn_messages + + +if __name__ == "__main__": + load_dotenv(verbose=True) + logging.basicConfig(level=logging.INFO) + + # Disable tracing to OpenAI platform since we are using Gemini models instead + # of OpenAI models + agents.set_tracing_disabled(disabled=True) + + demo = gr.ChatInterface( + _main, + chatbot=gr.Chatbot(height=600), + textbox=gr.Textbox(lines=1, placeholder="Enter your prompt"), + # Additional input to maintain session state across multiple turns + # NOTE: Examples must be a list of lists when additional inputs are provided + additional_inputs=gr.State(value={}, render=False), + examples=[ + [ + "Generate a report on products that customers stop buying after their third order." + ], + ], + title="2.1: ReAct for Retrieval-Augmented Generation with OpenAI Agent SDK", + ) + + try: + demo.launch(share=False) + finally: + asyncio.run(AsyncClientManager.get_instance().close()) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/utils.py b/aieng-eval-agents/aieng/agent_evals/report_generation/utils.py new file mode 100644 index 0000000..c66d5af --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/utils.py @@ -0,0 +1,210 @@ +"""Utility functions for the report generation agent.""" + +import uuid +from typing import Any + +from agents import SQLiteSession, StreamEvent, stream_events +from agents.items import ToolCallOutputItem +from gradio.components.chatbot import ChatMessage, MetadataDict +from openai.types.responses import ResponseFunctionToolCall, ResponseOutputText +from openai.types.responses.response_completed_event import ResponseCompletedEvent +from openai.types.responses.response_output_message import ResponseOutputMessage +from pydantic import AliasChoices, Field +from pydantic_settings import BaseSettings, SettingsConfigDict + + +def oai_agent_stream_to_gradio_messages(stream_event: StreamEvent) -> list[ChatMessage]: + """Parse agent sdk "stream event" into a list of gr messages. + + Adds extra data for tool use to make the gradio display informative. + """ + output: list[ChatMessage] = [] + + if isinstance(stream_event, stream_events.RawResponsesStreamEvent): + data = stream_event.data + if isinstance(data, ResponseCompletedEvent): + # The completed event may contain multiple output messages, + # including tool calls and final outputs. + # If there is at least one tool call, we mark the response as a thought. + is_thought = len(data.response.output) > 1 and any( + isinstance(message, ResponseFunctionToolCall) + for message in data.response.output + ) + + for message in data.response.output: + if isinstance(message, ResponseOutputMessage): + for _item in message.content: + if isinstance(_item, ResponseOutputText): + output.append( + ChatMessage( + role="assistant", + content=_item.text, + metadata={ + "title": "🧠 Thought", + "id": data.sequence_number, + } + if is_thought + else MetadataDict(), + ) + ) + elif isinstance(message, ResponseFunctionToolCall): + output.append( + ChatMessage( + role="assistant", + content=f"```\n{message.arguments}\n```", + metadata={ + "title": f"🛠️ Used tool `{message.name}`", + }, + ) + ) + + elif isinstance(stream_event, stream_events.RunItemStreamEvent): + name = stream_event.name + item = stream_event.item + + if name == "tool_output" and isinstance(item, ToolCallOutputItem): + output.append( + ChatMessage( + role="assistant", + content=f"```\n{item.output}\n```", + metadata={ + "title": "*Tool call output*", + "status": "done", # This makes it collapsed by default + }, + ) + ) + + return output + + +def get_or_create_session( + history: list[ChatMessage], + session_state: dict[str, Any], +) -> SQLiteSession: + """Get existing session or create a new one for conversation persistence. + + Args: + history: The history of the conversation. + session_state: The state of the session. + + Returns + ------- + The session. + """ + if len(history) == 0: + session = SQLiteSession(session_id=str(uuid.uuid4())) + session_state["session"] = session + else: + session = session_state["session"] + return session + + +class Configs(BaseSettings): + """Configuration settings loaded from environment variables. + + This class automatically loads configuration values from environment variables + and a .env file, and provides type-safe access to all settings. It validates + environment variables on instantiation. + + Attributes + ---------- + openai_base_url : str + Base URL for OpenAI-compatible API (defaults to Gemini endpoint). + openai_api_key : str + API key for OpenAI-compatible API (accepts OPENAI_API_KEY, GEMINI_API_KEY, + or GOOGLE_API_KEY). + default_planner_model : str, default='gemini-2.5-pro' + Model name for planning tasks. This is typically a more capable and expensive + model. + default_worker_model : str, default='gemini-2.5-flash' + Model name for worker tasks. This is typically a less expensive model. + embedding_base_url : str + Base URL for embedding API service. + embedding_api_key : str + API key for embedding service. + embedding_model_name : str, default='@cf/baai/bge-m3' + Name of the embedding model. + weaviate_collection_name : str, default='enwiki_20250520' + Name of the Weaviate collection to use. + weaviate_api_key : str + API key for Weaviate cloud instance. + weaviate_http_host : str + Weaviate HTTP host (must end with .weaviate.cloud). + weaviate_grpc_host : str + Weaviate gRPC host (must start with grpc- and end with .weaviate.cloud). + weaviate_http_port : int, default=443 + Port for Weaviate HTTP connections. + weaviate_grpc_port : int, default=443 + Port for Weaviate gRPC connections. + weaviate_http_secure : bool, default=True + Use secure HTTP connection. + weaviate_grpc_secure : bool, default=True + Use secure gRPC connection. + langfuse_public_key : str + Langfuse public key (must start with pk-lf-). + langfuse_secret_key : str + Langfuse secret key (must start with sk-lf-). + langfuse_host : str, default='https://us.cloud.langfuse.com' + Langfuse host URL. + e2b_api_key : str or None + Optional E2B.dev API key for code interpreter (must start with e2b_). + default_code_interpreter_template : str or None + Optional default template name or ID for E2B.dev code interpreter. + web_search_base_url : str or None + Optional base URL for web search service. + web_search_api_key : str or None + Optional API key for web search service. + + Examples + -------- + >>> from src.utils.env_vars import Configs + >>> config = Configs() + >>> print(config.default_planner_model) + 'gemini-2.5-pro' + + Notes + ----- + Create a .env file in your project root with the required environment + variables. The class will automatically load and validate them. + """ + + model_config = SettingsConfigDict( + env_file=".env", env_file_encoding="utf-8", env_ignore_empty=True + ) + + openai_base_url: str = "https://generativelanguage.googleapis.com/v1beta/openai/" + openai_api_key: str = Field( + validation_alias=AliasChoices( + "OPENAI_API_KEY", "GEMINI_API_KEY", "GOOGLE_API_KEY" + ) + ) + + default_planner_model: str = "gemini-2.5-pro" + default_worker_model: str = "gemini-2.5-flash" + + embedding_base_url: str + embedding_api_key: str + embedding_model_name: str = "@cf/baai/bge-m3" + + weaviate_collection_name: str = "enwiki_20250520" + weaviate_api_key: str + # ends with .weaviate.cloud, or it's "localhost" + weaviate_http_host: str = Field(pattern=r"^.*\.weaviate\.cloud$|localhost") + # starts with grpc- ends with .weaviate.cloud, or it's "localhost" + weaviate_grpc_host: str = Field(pattern=r"^grpc-.*\.weaviate\.cloud$|localhost") + weaviate_http_port: int = 443 + weaviate_grpc_port: int = 443 + weaviate_http_secure: bool = True + weaviate_grpc_secure: bool = True + + langfuse_public_key: str = Field(pattern=r"^pk-lf-.*$") + langfuse_secret_key: str = Field(pattern=r"^sk-lf-.*$") + langfuse_host: str = "https://us.cloud.langfuse.com" + + # Optional E2B.dev API key for Python Code Interpreter tool + e2b_api_key: str | None = Field(default=None, pattern=r"^e2b_.*$") + default_code_interpreter_template: str | None = "9p6favrrqijhasgkq1tv" + + # Optional configs for web search tool + web_search_base_url: str | None = None + web_search_api_key: str | None = None diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/weaviate.py b/aieng-eval-agents/aieng/agent_evals/report_generation/weaviate.py new file mode 100644 index 0000000..a0eeaaf --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/weaviate.py @@ -0,0 +1,210 @@ +"""Implements knowledge retrieval tool for Weaviate.""" + +import asyncio +import logging +import os +from typing import Awaitable, Callable, TypeVar + +import backoff +import openai +import pydantic +from aieng.agent_evals.report_generation.utils import Configs +from weaviate.client import WeaviateAsyncClient +from weaviate.connect.helpers import use_async_with_custom + + +class _Source(pydantic.BaseModel): + """Type hints for the "_source" field in ES Search Results.""" + + title: str + section: str | None = None + + +class _Highlight(pydantic.BaseModel): + """Type hints for the "highlight" field in ES Search Results.""" + + text: list[str] + + +class _SearchResult(pydantic.BaseModel): + """Type hints for knowledge base search result.""" + + source: _Source = pydantic.Field(alias="_source") + highlight: _Highlight + + def __repr__(self) -> str: + return self.model_dump_json(indent=2) + + +SearchResults = list[_SearchResult] + + +class AsyncWeaviateKnowledgeBase: + """Configurable search tools for Weaviate knowledge base.""" + + def __init__( + self, + async_client: WeaviateAsyncClient, + collection_name: str, + num_results: int = 5, + snippet_length: int = 1000, + max_concurrency: int = 3, + embedding_model_name: str = "@cf/baai/bge-m3", + embedding_api_key: str | None = None, + embedding_base_url: str | None = None, + ) -> None: + self.async_client = async_client + self.collection_name = collection_name + self.num_results = num_results + self.snippet_length = snippet_length + self.logger = logging.getLogger(__name__) + self.semaphore = asyncio.Semaphore(max_concurrency) + + self.embedding_model_name = embedding_model_name + self.embedding_api_key = embedding_api_key + self.embedding_base_url = embedding_base_url + + self._embed_client = openai.OpenAI( + api_key=self.embedding_api_key or os.getenv("EMBEDDING_API_KEY"), + base_url=self.embedding_base_url or os.getenv("EMBEDDING_BASE_URL"), + max_retries=5, + ) + + @backoff.on_exception(backoff.expo, exception=asyncio.CancelledError) # type: ignore + async def search_knowledgebase(self, keyword: str) -> SearchResults: + """Search knowledge base. + + Parameters + ---------- + keyword : str + The search keyword to query the knowledge base. + + Returns + ------- + SearchResults + A list of search results. Each result contains source and highlight. + If no results are found, returns an empty list. + + Raises + ------ + Exception + If Weaviate is not ready to accept requests (HTTP 503). + + """ + async with self.async_client: + if not await self.async_client.is_ready(): + raise Exception("Weaviate is not ready to accept requests (HTTP 503).") + + collection = self.async_client.collections.get(self.collection_name) + vector = self._vectorize(keyword) + response = await rate_limited( + lambda: collection.query.hybrid( + keyword, vector=vector, limit=self.num_results + ), + semaphore=self.semaphore, + ) + + self.logger.info(f"Query: {keyword}; Returned matches: {len(response.objects)}") + + hits = [] + for obj in response.objects: + text = obj.properties.get("text", "") + assert isinstance(text, str) + + hit = { + "_source": { + "title": obj.properties.get("title", ""), + "section": obj.properties.get("section", None), + }, + "highlight": {"text": [text[: self.snippet_length]]}, + } + hits.append(hit) + + return [_SearchResult.model_validate(_hit) for _hit in hits] + + def _vectorize(self, text: str) -> list[float]: + """Vectorize text using the embedding client. + + Parameters + ---------- + text : str + The text to be vectorized. + + Returns + ------- + list[float] + A list of floats representing the vectorized text. + """ + response = self._embed_client.embeddings.create( + input=text, model=self.embedding_model_name + ) + return response.data[0].embedding + + +def get_weaviate_async_client(configs: Configs) -> WeaviateAsyncClient: + """Get an async Weaviate client. + + If no parameters are provided, the function will attempt to connect to a local + Weaviate instance using environment variables. + + Parameters + ---------- + http_host : str, optional, default=None + The HTTP host for the Weaviate instance. If not provided, defaults to the + `WEAVIATE_HTTP_HOST` environment variable or "localhost" if the environment + variable is not set. + http_port : int, optional, default=None + The HTTP port for the Weaviate instance. If not provided, defaults to the + `WEAVIATE_HTTP_PORT` environment variable or 8080 if the environment variable + is not set. + http_secure : bool, optional, default=False + Whether to use HTTPS for the HTTP connection. Defaults to the + `WEAVIATE_HTTP_SECURE` environment variable or `False` if the environment + variable is not set. + grpc_host : str, optional, default=None + The gRPC host for the Weaviate instance. If not provided, defaults to the + `WEAVIATE_GRPC_HOST` environment variable or "localhost" if the environment + variable is not set. + grpc_port : int, optional, default=None + The gRPC port for the Weaviate instance. If not provided, defaults to the + `WEAVIATE_GRPC_PORT` environment variable or 50051 if the environment variable + is not set. + grpc_secure : bool, optional, default=False + Whether to use secure gRPC. Defaults to the `WEAVIATE_GRPC_SECURE` environment + variable or `False` if the environment variable is not set. + api_key : str, optional, default=None + The API key for authentication with Weaviate. If not provided, defaults to the + `WEAVIATE_API_KEY` environment variable. + headers : dict[str, str], optional, default=None + Additional headers to include in the request. + additional_config : AdditionalConfig, optional, default=None + Additional configuration for the Weaviate client. + skip_init_checks : bool, optional, default=False + Whether to skip initialization checks. + + Returns + ------- + WeaviateAsyncClient + An asynchronous Weaviate client configured with the provided parameters. + """ + return use_async_with_custom( + http_host=configs.weaviate_http_host or "localhost", + http_port=configs.weaviate_http_port or 8080, + http_secure=configs.weaviate_http_secure or False, + grpc_host=configs.weaviate_grpc_host or "localhost", + grpc_port=configs.weaviate_grpc_port or 50051, + grpc_secure=configs.weaviate_grpc_secure or False, + auth_credentials=configs.weaviate_api_key or None, + ) + + +T = TypeVar("T") + + +async def rate_limited( + _fn: Callable[[], Awaitable[T]], + semaphore: asyncio.Semaphore, +) -> T: + """Run _fn with semaphore rate limit.""" + async with semaphore: + return await _fn() From 9e6ce2eae74aa441565a5852ebd76293ad4bf0f4 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Wed, 21 Jan 2026 11:47:00 -0500 Subject: [PATCH 02/10] Adding data import for the online retail dataset and some more instructions --- .../agent_evals/report_generation/README.md | 10 +- .../report_generation/data_import.py | 177 ++++++++++++++++++ .../agent_evals/report_generation/main.py | 4 +- .../agent_evals/report_generation/utils.py | 15 +- pyproject.toml | 2 +- 5 files changed, 191 insertions(+), 17 deletions(-) create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/data_import.py diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/README.md b/aieng-eval-agents/aieng/agent_evals/report_generation/README.md index aa66aaa..9166f4c 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/README.md +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/README.md @@ -2,10 +2,16 @@ ## Dataset -https://www.kaggle.com/datasets/yasserh/instacart-online-grocery-basket-analysis-dataset?resource=download +https://archive.ics.uci.edu/dataset/352/online+retail + +To import it into weaviate: + +```bash +uv run --env-file .env python -m aieng.agent_evals.report_generation.data_import +``` ## Running ```bash uv run --env-file .env python -m aieng.agent_evals.report_generation.main -`` +``` diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/data_import.py b/aieng-eval-agents/aieng/agent_evals/report_generation/data_import.py new file mode 100644 index 0000000..6d2a4ee --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/data_import.py @@ -0,0 +1,177 @@ +""" +Import data from the Online Retail dataset into Weaviate. + +https://archive.ics.uci.edu/dataset/352/online+retail +""" + +import csv +import logging +import os +from copy import copy +from datetime import datetime +from typing import Any + +from weaviate.classes.config import Configure, DataType, Property +from weaviate.client import WeaviateClient +from weaviate.collections.collection.sync import Collection +from weaviate.connect.helpers import connect_to_local + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +DATASET_PATH = "/Users/marcelolotif/Downloads/Online Retail Sample 1k.csv" +COLLECTION_NAME = "OnlineRetail" + + +def is_numeric(value: str) -> bool: + """Check if a string represents a numeric value. + + Args: + value: The value to check. + + Returns + ------- + True if the value is a numeric value, False otherwise. + """ + try: + float(value) + return True + except ValueError: + return False + + +def type_for_column_name(column_name: str) -> DataType: + """Get the data type for a column name. + + Args: + column_name: The name of the column. + + Returns + ------- + The data type for the column. + """ + if column_name in ["InvoiceNo", "Quantity", "CustomerID"]: + return DataType.INT + if column_name == "UnitPrice": + return DataType.NUMBER + if column_name == "InvoiceDate": + return DataType.DATE + return DataType.TEXT + + +def make_data_row(column_names: list[str], row: list[str]) -> dict[str, Any] | None: + """Make a data row from a list of column names and a list of values. + + Args: + column_names: The names of the columns. + row: The values of the row. + + Returns + ------- + The data row, or None if the row is invalid. + """ + if len(row) == 0: + logger.warning(f"Skipping row because it has no values: {row}") + return None + + data_row: dict[str, Any] = {} + for i in range(len(row)): + column_name = column_names[i] + value = row[i] + + try: + data_type = type_for_column_name(column_name) + if data_type == DataType.INT: + data_row[column_name] = int(value) + elif data_type == DataType.NUMBER: + data_row[column_name] = float(value) + elif data_type == DataType.DATE: + data_row[column_name] = datetime.strptime(value, "%m/%d/%y %H:%M") + else: + data_row[column_name] = str(value) + except Exception: + logger.exception(f"Skipping row because of error: {row}") + return None + + return data_row + + +def make_collection_with_column_names(weaviate_client: WeaviateClient, column_names: list[str]) -> Collection: + """Make a collection with the column names of the rows. + + Args: + weaviate_client: The Weaviate client. + column_names: The names of the columns. + + Returns + ------- + The weaviate collection object. + """ + properties = [] + for column_name in column_names: + properties.append(Property(name=column_name, data_type=type_for_column_name(column_name))) + + return weaviate_client.collections.create( + name=COLLECTION_NAME, + vector_config=Configure.Vectors.text2vec_openai(), + properties=properties, + ) + + +if __name__ == "__main__": + weaviate_client = connect_to_local(headers={"X-OpenAI-Api-Key": os.environ["OPENAI_API_KEY"]}) + weaviate_collection = None + + with open(DATASET_PATH, "r") as file: + csv_reader = csv.reader(file) + + column_names = None + data_rows = [] + first_row = True + logger.info("Collecting data points...") + + for r in csv_reader: + row = copy(r) + if first_row: + first_row = False + + column_names = row + + if weaviate_client.collections.exists(COLLECTION_NAME): + weaviate_collection = weaviate_client.collections.use(COLLECTION_NAME) + else: + weaviate_collection = make_collection_with_column_names(weaviate_client, column_names) + + else: + if not is_numeric(row[0]): + logger.warning(f"Skipping row because it seems to be invalid: {row}") + continue + + assert column_names is not None, "Column names should not be None" + data_row = make_data_row(column_names, row) + if data_row is not None: + data_rows.append(data_row) + + assert weaviate_collection is not None, "Weaviate collection should not be None" + + logger.info("Importing data points...") + with weaviate_collection.batch.fixed_size(batch_size=100, concurrent_requests=2) as batch: + for data_row in data_rows: + batch.add_object(properties=data_row) + + # Check for failed objects + failed_objects = weaviate_collection.batch.failed_objects + if failed_objects: + print(f"Number of failed imports: {len(failed_objects)}") + for failed in failed_objects[:3]: # Show first 3 failures + print(f"Failed object: {failed}") + + # Verify client-side batch import + result = weaviate_collection.aggregate.over_all(total_count=True) + logger.info(f"Client-side batch had {len(failed_objects)} failures") + logger.info(f"Expected {len(data_rows)} objects, got {result.total_count}") + logger.info(f"✓ Client-side batch: {result.total_count} objects imported successfully") + + weaviate_client.close() diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/main.py b/aieng-eval-agents/aieng/agent_evals/report_generation/main.py index 57be812..b2e3bd8 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/main.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/main.py @@ -86,9 +86,7 @@ async def _main( # NOTE: Examples must be a list of lists when additional inputs are provided additional_inputs=gr.State(value={}, render=False), examples=[ - [ - "Generate a report on products that customers stop buying after their third order." - ], + ["Generate a monthly sales performance report for the last year that data is available."], ], title="2.1: ReAct for Retrieval-Augmented Generation with OpenAI Agent SDK", ) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/utils.py b/aieng-eval-agents/aieng/agent_evals/report_generation/utils.py index c66d5af..8bafdbb 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/utils.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/utils.py @@ -27,8 +27,7 @@ def oai_agent_stream_to_gradio_messages(stream_event: StreamEvent) -> list[ChatM # including tool calls and final outputs. # If there is at least one tool call, we mark the response as a thought. is_thought = len(data.response.output) > 1 and any( - isinstance(message, ResponseFunctionToolCall) - for message in data.response.output + isinstance(message, ResponseFunctionToolCall) for message in data.response.output ) for message in data.response.output: @@ -168,16 +167,10 @@ class Configs(BaseSettings): variables. The class will automatically load and validate them. """ - model_config = SettingsConfigDict( - env_file=".env", env_file_encoding="utf-8", env_ignore_empty=True - ) + model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", env_ignore_empty=True) openai_base_url: str = "https://generativelanguage.googleapis.com/v1beta/openai/" - openai_api_key: str = Field( - validation_alias=AliasChoices( - "OPENAI_API_KEY", "GEMINI_API_KEY", "GOOGLE_API_KEY" - ) - ) + openai_api_key: str = Field(validation_alias=AliasChoices("OPENAI_API_KEY", "GEMINI_API_KEY", "GOOGLE_API_KEY")) default_planner_model: str = "gemini-2.5-pro" default_worker_model: str = "gemini-2.5-flash" @@ -187,7 +180,7 @@ class Configs(BaseSettings): embedding_model_name: str = "@cf/baai/bge-m3" weaviate_collection_name: str = "enwiki_20250520" - weaviate_api_key: str + weaviate_api_key: str | None = None # ends with .weaviate.cloud, or it's "localhost" weaviate_http_host: str = Field(pattern=r"^.*\.weaviate\.cloud$|localhost") # starts with grpc- ends with .weaviate.cloud, or it's "localhost" diff --git a/pyproject.toml b/pyproject.toml index 82aa92c..84c2685 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,7 +77,7 @@ aieng-eval-agents = { workspace = true } [tool.ruff] include = ["*.py", "pyproject.toml", "*.ipynb"] -line-length = 88 +line-length = 120 [tool.ruff.format] quote-style = "double" From 0098f7d8233e2b16cf4f3fe7ac121a6ab6ea8e02 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 22 Jan 2026 11:37:51 -0500 Subject: [PATCH 03/10] Weaviate local and remote scripts --- .gitignore | 1 + .../report_generation/async_client_manager.py | 1 + .../report_generation/create_weaviate_kb.py | 270 ++++++++++++++++++ .../report_generation/data_import.py | 191 +++++++++++-- .../report_generation/docker-compose.yaml | 13 + 5 files changed, 451 insertions(+), 25 deletions(-) create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/create_weaviate_kb.py create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/docker-compose.yaml diff --git a/.gitignore b/.gitignore index c716ed5..0b28586 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ wheels/ .env .gradio +aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.db diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py b/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py index 911c838..46cc520 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py @@ -88,6 +88,7 @@ def knowledgebase(self) -> AsyncWeaviateKnowledgeBase: self._knowledgebase = AsyncWeaviateKnowledgeBase( self.weaviate_client, collection_name=self.configs.weaviate_collection_name, + embedding_model_name=self.configs.embedding_model_name, ) self._initialized = True return self._knowledgebase diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/create_weaviate_kb.py b/aieng-eval-agents/aieng/agent_evals/report_generation/create_weaviate_kb.py new file mode 100644 index 0000000..7359c78 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/create_weaviate_kb.py @@ -0,0 +1,270 @@ +"""Create a knowledge base in Weaviate from a HuggingFace dataset.""" + +import asyncio +import logging +import os +from typing import Any, Optional + +import click +import datasets +import weaviate.classes.config as wc +from datasets import Features, Sequence, Value, load_dataset +from dotenv import load_dotenv +from openai import AsyncOpenAI, OpenAIError +from weaviate.classes.config import DataType as WDataType +from weaviate.classes.init import Auth +from weaviate.client import WeaviateClient +from weaviate.collections.collection.sync import Collection +from weaviate.connect.helpers import connect_to_local, connect_to_weaviate_cloud + + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") +load_dotenv(".env.participants") +logger = logging.getLogger(__name__) + + +SEARCHABLE_TEXT_COLUMN = "Description" + +HF_TO_WEAVIATE_TYPE_MAP = { + "string": WDataType.TEXT, + "list[string]": WDataType.TEXT_ARRAY, + "int64": WDataType.INT, + "int32": WDataType.INT, + "list[int64]": WDataType.INT_ARRAY, + "list[int32]": WDataType.INT_ARRAY, + "bool": WDataType.BOOL, + "list[bool]": WDataType.BOOL_ARRAY, + "float64": WDataType.NUMBER, + "float32": WDataType.NUMBER, + "list[float64]": WDataType.NUMBER_ARRAY, + "list[float32]": WDataType.NUMBER_ARRAY, + "timestamp[s]": WDataType.DATE, + "list[timestamp[s]]": WDataType.DATE_ARRAY, + "binary": WDataType.BLOB, + "dict": WDataType.OBJECT, +} + + +def _hf_type_to_str(feature: Any) -> str: + """Convert HuggingFace feature type to a string representation.""" + if isinstance(feature, Value): + return feature.dtype # e.g., 'string', 'int64', 'float64', 'bool', 'binary' + if isinstance(feature, Sequence): + # Recursively get the type of the inner feature + return f"list[{_hf_type_to_str(feature.feature)}]" + if isinstance(feature, list): + return f"list[{_hf_type_to_str(feature[0])}]" + if isinstance(feature, dict): + return "dict" + return str(type(feature)) + + +def _create_properties(features: Features) -> list[wc.Property]: + """Create Weaviate properties from HuggingFace dataset features.""" + properties = [] + for name, feature in features.items(): + print(name, feature) + hf_type = _hf_type_to_str(feature) + weaviate_type = HF_TO_WEAVIATE_TYPE_MAP.get(hf_type) + if weaviate_type is None: + raise ValueError(f"Unsupported feature type: {feature}") + + properties.append( + wc.Property( + name=name.replace("-", "_").replace(" ", "_") if name != "id" else "id_", + data_type=weaviate_type, + index_filterable=name != SEARCHABLE_TEXT_COLUMN, + index_searchable=name != SEARCHABLE_TEXT_COLUMN + and weaviate_type in [WDataType.TEXT, WDataType.TEXT_ARRAY], + skip_vectorization=name != SEARCHABLE_TEXT_COLUMN, + ) + ) + return properties + + +def _create_or_get_weaviate_collection(client: WeaviateClient, collection_name: str, features: Features) -> Collection: + """Get a Weaviate collection by name.""" + if client.collections.exists(collection_name): + collection = client.collections.get(collection_name) + else: + properties = _create_properties(features) + collection = client.collections.create( + collection_name, + properties=properties, + vectorizer_config=wc.Configure.Vectorizer.none(), + vector_index_config=wc.Configure.VectorIndex.dynamic( + distance_metric=wc.VectorDistances.COSINE, + threshold=10_000, + flat=wc.Configure.VectorIndex.flat(quantizer=wc.Configure.VectorIndex.Quantizer.bq(cache=True)), + hnsw=wc.Configure.VectorIndex.hnsw( + quantizer=wc.Configure.VectorIndex.Quantizer.pq(segments=128, training_limit=50_000), + ), + ), + ) + logger.info(f"Created Weaviate collection '{collection_name}' with properties: {properties}") + return collection + + +async def _get_embeddings(texts: list[str], embedding_client: AsyncOpenAI, model_name: str) -> list[list[float]]: + try: + embeddings = await embedding_client.embeddings.create(input=texts, model=model_name) + except OpenAIError as e: + if hasattr(e, "status_code") and e.status_code == 400 and "context" in str(e): + embeddings = await embedding_client.embeddings.create( + input=[text[:10000] for text in texts], model=model_name + ) + else: + raise + return [embedding.embedding for embedding in embeddings.data] + + +async def _producer( + dataset: datasets.Dataset, + batch_size: int, + embedding_client: AsyncOpenAI, + model_name: str, + obj_queue: asyncio.Queue, +) -> None: + """Create batches of objects from the dataset with the vector included.""" + for batch in dataset.iter(batch_size=batch_size): + objects: dict[str, list[Any]] = {} + + # Filter out None or empty strings from the batch + # Get index of empty or None text entries + null_indices = [i for i, text in enumerate(batch[SEARCHABLE_TEXT_COLUMN]) if text is None or text == ""] + # Remove empty or None text entries from the batch + if null_indices: + for key in batch: + objects[key.replace("-", "_").replace(" ", "_")] = [ + v for i, v in enumerate(batch[key]) if i not in null_indices + ] + else: + objects = batch + + # Get embeddings for the batch + embeddings = await _get_embeddings(objects[SEARCHABLE_TEXT_COLUMN], embedding_client, model_name) + + # Rename "id" to "id_" to avoid conflict with Weaviate's reserved field + if "id" in objects: + objects["id_"] = objects.pop("id") + + objects["vector"] = embeddings + + await obj_queue.put(objects) + + +async def _consumer(collection: Collection, obj_queue: asyncio.Queue) -> None: + """Consume objects from the queue and add them to Weaviate.""" + while True: + objects: dict[str, list[Any]] = await obj_queue.get() + if objects is None: + break # Exit signal + + with collection.batch.fixed_size(batch_size=len(objects), concurrent_requests=1) as batch: + vectors = objects.pop("vector") + for i in range(len(objects[SEARCHABLE_TEXT_COLUMN])): + obj = {k.replace("-", "_").replace(" ", "_"): v[i] for k, v in objects.items()} + batch.add_object(obj, vector=vectors[i]) + + # Flush the batch to Weaviate + batch.flush() # type: ignore[attr-defined] + obj_queue.task_done() + + +@click.command() +@click.option("--dataset-name", required=True, help="HuggingFace dataset name") +@click.option("--collection-name", required=True, help="Name of weaviate collection") +@click.option("--model-name", envvar="EMBEDDING_MODEL_NAME", help="Embedding model name") +@click.option("--batch-size", default=5, help="Batch size for processing") +@click.option("--max-concurrent", default=50, help="Max concurrent consumers") +@click.option("--retry-attempts", default=5, help="Number of retry attempts for requests") +@click.option("--timeout-seconds", default=30, help="Timeout for requests in seconds") +@click.option("--cache-dir", help="Dataset cache directory") +def _cli( + dataset_name, + collection_name, + model_name, + batch_size, + max_concurrent, + retry_attempts, + timeout_seconds, + cache_dir, +): + asyncio.run( + main( + dataset_name, + collection_name, + model_name, + batch_size, + max_concurrent, + retry_attempts, + timeout_seconds, + cache_dir, + ) + ) + + +async def main( + dataset_name: str, + collection_name: str, + model_name: str, + batch_size: int, + max_concurrent: int, + retry_attempts: int, + timeout_seconds: int, + cache_dir: str | None = None, +) -> None: + """Generate embeddings for a streaming HuggingFace dataset.""" + # Load dataset + dataset = load_dataset(dataset_name, split="train", cache_dir=cache_dir) + # Remove columns with nested/dict features + dataset = dataset.remove_columns( + [col for col in dataset.column_names if isinstance(dataset.features[col], dict) or "Unnamed" in col] + ) + assert SEARCHABLE_TEXT_COLUMN in dataset.column_names, f"Dataset must contain a '{SEARCHABLE_TEXT_COLUMN}' column" + + # Create Weaviate Client + weaviate_client = connect_to_weaviate_cloud( + cluster_url=os.environ["WEAVIATE_HTTP_HOST"], + auth_credentials=Auth.api_key(os.environ["WEAVIATE_API_KEY"]), + ) + weaviate_client = connect_to_local(headers={"X-OpenAI-Api-Key": os.environ["OPENAI_API_KEY"]}) + logger.info(f"Weaviate client ready: {weaviate_client.is_ready()}") + + # Get or create Weaviate collection + collection = _create_or_get_weaviate_collection(weaviate_client, collection_name, dataset.features) + + # Initialize OpenAI client for embedding service + embedding_client = AsyncOpenAI( + api_key=os.environ["EMBEDDING_API_KEY"], + base_url=os.environ["EMBEDDING_BASE_URL"], + timeout=timeout_seconds, + max_retries=retry_attempts, + ) + + # Orchestrate producer and consumers + obj_queue: asyncio.Queue[Optional[dict[str, list[Any]]]] = asyncio.Queue(maxsize=max_concurrent * 5) + producer = asyncio.create_task(_producer(dataset, batch_size, embedding_client, model_name, obj_queue)) + # Create consumers to process the queue + consumers = [asyncio.create_task(_consumer(collection, obj_queue)) for _ in range(max_concurrent)] + + try: + # Wait for the producer to finish + await producer + + # Signal consumers to stop + for _ in range(max_concurrent): + await obj_queue.put(None) + + # Wait for all consumers to finish + await asyncio.gather(*consumers) + except Exception as e: + logger.error(f"Unexpected error occurred: {e}", exc_info=True) + finally: + weaviate_client.close() + await embedding_client.close() + logger.info("Finished processing dataset and closing clients.") + + +if __name__ == "__main__": + _cli() diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/data_import.py b/aieng-eval-agents/aieng/agent_evals/report_generation/data_import.py index 6d2a4ee..e769845 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/data_import.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/data_import.py @@ -4,6 +4,7 @@ https://archive.ics.uci.edu/dataset/352/online+retail """ +import asyncio import csv import logging import os @@ -11,7 +12,8 @@ from datetime import datetime from typing import Any -from weaviate.classes.config import Configure, DataType, Property +from openai import AsyncOpenAI, OpenAIError +from weaviate.classes.config import Configure, DataType, Property, VectorDistances from weaviate.client import WeaviateClient from weaviate.collections.collection.sync import Collection from weaviate.connect.helpers import connect_to_local @@ -22,7 +24,9 @@ DATASET_PATH = "/Users/marcelolotif/Downloads/Online Retail Sample 1k.csv" -COLLECTION_NAME = "OnlineRetail" +COLLECTION_NAME = os.getenv("WEAVIATE_COLLECTION_NAME") +EMBEDDING_MODEL_NAME = os.getenv("EMBEDDING_MODEL_NAME") +SEACHEABLE_TEXT_COLUMN = "Description" def is_numeric(value: str) -> bool: @@ -113,22 +117,129 @@ def make_collection_with_column_names(weaviate_client: WeaviateClient, column_na for column_name in column_names: properties.append(Property(name=column_name, data_type=type_for_column_name(column_name))) + assert COLLECTION_NAME is not None, "WEAVIATE_COLLECTION_NAME env var must be set" + return weaviate_client.collections.create( name=COLLECTION_NAME, - vector_config=Configure.Vectors.text2vec_openai(), + vectorizer_config=Configure.Vectorizer.none(), + vector_index_config=Configure.VectorIndex.dynamic( + distance_metric=VectorDistances.COSINE, + threshold=10_000, + flat=Configure.VectorIndex.flat(quantizer=Configure.VectorIndex.Quantizer.bq(cache=True)), + hnsw=Configure.VectorIndex.hnsw( + quantizer=Configure.VectorIndex.Quantizer.pq(segments=128, training_limit=50_000), + ), + ), properties=properties, ) -if __name__ == "__main__": +async def get_embeddings(texts: list[str], embedding_client: AsyncOpenAI, model_name: str) -> list[list[float]]: + """Get embeddings for a list of texts. + + Args: + texts: The texts to get embeddings for. + embedding_client: The embedding client. + model_name: The model name to use. + + Returns + ------- + The embeddings for the texts. + """ + try: + embeddings = await embedding_client.embeddings.create(input=texts, model=model_name) + except OpenAIError as e: + if hasattr(e, "status_code") and e.status_code == 400 and "context" in str(e): + embeddings = await embedding_client.embeddings.create( + input=[text[:10000] for text in texts], model=model_name + ) + else: + raise + return [embedding.embedding for embedding in embeddings.data] + + +def list_of_dicts_to_dict_of_lists(data: list[dict[str, Any]]) -> dict[str, list[Any]]: + """Convert a list of dictionaries to a dictionary of lists. + + Args: + data: List of dictionaries with the same keys + + Returns + ------- + Dictionary where each key maps to a list of values + """ + if not data: + return {} + + keys = data[0].keys() + return {key: [item[key] for item in data] for key in keys} + + +async def producer( + dataset: list[dict[str, Any]], + batch_size: int, + embedding_client: AsyncOpenAI, + model_name: str, + obj_queue: asyncio.Queue, +) -> None: + """Create batches of objects from the dataset with the vector included.""" + for i in range(0, len(dataset), batch_size): + batch = list_of_dicts_to_dict_of_lists(dataset[i : i + batch_size]) + + objects: dict[str, list[Any]] = {} + + # Filter out None or empty strings from the batch + # Get index of empty or None text entries + null_indices = [i for i, text in enumerate(batch[SEACHEABLE_TEXT_COLUMN]) if text is None or text == ""] + # Remove empty or None text entries from the batch + if null_indices: + for key in batch: + objects[key.replace("-", "_").replace(" ", "_")] = [ + v for i, v in enumerate(batch[key]) if i not in null_indices + ] + else: + objects = batch + + # Get embeddings for the batch + embeddings = await get_embeddings(objects[SEACHEABLE_TEXT_COLUMN], embedding_client, model_name) + + # Rename "id" to "id_" to avoid conflict with Weaviate's reserved field + if "id" in objects: + objects["id_"] = objects.pop("id") + + objects["vector"] = embeddings + + await obj_queue.put(objects) + + +async def consumer(collection: Collection, obj_queue: asyncio.Queue) -> None: + """Consume objects from the queue and add them to Weaviate.""" + while True: + objects: dict[str, list[Any]] = await obj_queue.get() + if objects is None: + break # Exit signal + + with collection.batch.fixed_size(batch_size=len(objects), concurrent_requests=1) as batch: + vectors = objects.pop("vector") + for i in range(len(objects[SEACHEABLE_TEXT_COLUMN])): + obj = {k.replace("-", "_").replace(" ", "_"): v[i] for k, v in objects.items()} + batch.add_object(obj, vector=vectors[i]) + + # Flush the batch to Weaviate + batch.flush() # type: ignore[attr-defined] + + obj_queue.task_done() + + +async def main(): + """Import data from the Online Retail dataset into Weaviate.""" weaviate_client = connect_to_local(headers={"X-OpenAI-Api-Key": os.environ["OPENAI_API_KEY"]}) weaviate_collection = None + data_rows = [] with open(DATASET_PATH, "r") as file: csv_reader = csv.reader(file) - column_names = None - data_rows = [] first_row = True logger.info("Collecting data points...") @@ -156,22 +267,52 @@ def make_collection_with_column_names(weaviate_client: WeaviateClient, column_na assert weaviate_collection is not None, "Weaviate collection should not be None" - logger.info("Importing data points...") - with weaviate_collection.batch.fixed_size(batch_size=100, concurrent_requests=2) as batch: - for data_row in data_rows: - batch.add_object(properties=data_row) - - # Check for failed objects - failed_objects = weaviate_collection.batch.failed_objects - if failed_objects: - print(f"Number of failed imports: {len(failed_objects)}") - for failed in failed_objects[:3]: # Show first 3 failures - print(f"Failed object: {failed}") - - # Verify client-side batch import - result = weaviate_collection.aggregate.over_all(total_count=True) - logger.info(f"Client-side batch had {len(failed_objects)} failures") - logger.info(f"Expected {len(data_rows)} objects, got {result.total_count}") - logger.info(f"✓ Client-side batch: {result.total_count} objects imported successfully") - - weaviate_client.close() + batch_size = 5 + max_concurrent = 50 + + embedding_client = AsyncOpenAI( + api_key=os.environ["EMBEDDING_API_KEY"], + base_url=os.environ["EMBEDDING_BASE_URL"], + timeout=30, + max_retries=5, + ) + + # Orchestrate producer and consumers + obj_queue = asyncio.Queue(maxsize=max_concurrent * 5) + producer_task = asyncio.create_task( + producer(data_rows, batch_size, embedding_client, EMBEDDING_MODEL_NAME, obj_queue) + ) + # Create consumers to process the queue + consumer_tasks = [asyncio.create_task(consumer(weaviate_collection, obj_queue)) for _ in range(max_concurrent)] + + try: + # Wait for the producer to finish + await producer_task + + # Signal consumers to stop + for _ in range(max_concurrent): + await obj_queue.put(None) + + # Wait for all consumers to finish + await asyncio.gather(*consumer_tasks) + except Exception as e: + logger.error(f"Unexpected error occurred: {e}", exc_info=True) + + # Check for failed objects + failed_objects = weaviate_collection.batch.failed_objects + if failed_objects: + print(f"Number of failed imports: {len(failed_objects)}") + for failed in failed_objects[:3]: # Show first 3 failures + print(f"Failed object: {failed}") + + # Verify client-side batch import + result = weaviate_collection.aggregate.over_all(total_count=True) + logger.info(f"Client-side batch had {len(failed_objects)} failures") + logger.info(f"Expected {len(data_rows)} objects, got {result.total_count}") + logger.info(f"✓ Client-side batch: {result.total_count} objects imported successfully") + + weaviate_client.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/docker-compose.yaml b/aieng-eval-agents/aieng/agent_evals/report_generation/docker-compose.yaml new file mode 100644 index 0000000..0791edd --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/docker-compose.yaml @@ -0,0 +1,13 @@ +services: + weaviate: + image: cr.weaviate.io/semitechnologies/weaviate:1.35.3 + ports: + - 8080:8080 + - 50051:50051 + restart: on-failure:0 + environment: + QUERY_DEFAULTS_LIMIT: 25 + AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true' + PERSISTENCE_DATA_PATH: '/var/lib/weaviate' + CLUSTER_HOSTNAME: 'node1' + ASYNC_INDEXING: 'true' From 6592a1ce6c6597f9e7d5fdad47cc6af5071a94a8 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 22 Jan 2026 11:55:41 -0500 Subject: [PATCH 04/10] Deleting weaviate stuff, using Online Retail dataset instead --- .../report_generation/async_client_manager.py | 64 ++-- .../report_generation/create_weaviate_kb.py | 270 --------------- .../report_generation/data/convert_dates.py | 100 ++++++ .../report_generation/data_import.py | 318 ------------------ .../report_generation/docker-compose.yaml | 13 - .../agent_evals/report_generation/main.py | 9 +- .../agent_evals/report_generation/weaviate.py | 210 ------------ 7 files changed, 141 insertions(+), 843 deletions(-) delete mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/create_weaviate_kb.py create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/data/convert_dates.py delete mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/data_import.py delete mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/docker-compose.yaml delete mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/weaviate.py diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py b/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py index 46cc520..77da0de 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py @@ -5,15 +5,38 @@ hot-reload process. """ +import sqlite3 +from typing import Any + from aieng.agent_evals.report_generation.utils import Configs -from aieng.agent_evals.report_generation.weaviate import ( - AsyncWeaviateKnowledgeBase, - get_weaviate_async_client, -) from openai import AsyncOpenAI from weaviate.client import WeaviateAsyncClient +class SQLiteConnection: + """SQLite connection.""" + + def __init__(self) -> None: + self._connection = sqlite3.connect("aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.db") + + def execute(self, query: str) -> list[Any]: + """Execute a SQLite query. + + Args: + query: The SQLite query to execute. + + Returns + ------- + The result of the query. Will return the result of + `execute(query).fetchall()`. + """ + return self._connection.execute(query).fetchall() + + def close(self) -> None: + """Close the SQLite connection.""" + self._connection.close() + + class AsyncClientManager: """Manages async client lifecycle with lazy initialization and cleanup. @@ -55,7 +78,7 @@ def __init__(self, configs: Configs | None = None) -> None: self._configs: Configs | None = configs self._weaviate_client: WeaviateAsyncClient | None = None self._openai_client: AsyncOpenAI | None = None - self._knowledgebase: AsyncWeaviateKnowledgeBase | None = None + self._sqlite_connection: SQLiteConnection | None = None self._initialized: bool = False @property @@ -74,36 +97,23 @@ def openai_client(self) -> AsyncOpenAI: return self._openai_client @property - def weaviate_client(self) -> WeaviateAsyncClient: - """Get or create Weaviate client.""" - if self._weaviate_client is None: - self._weaviate_client = get_weaviate_async_client(self.configs) - self._initialized = True - return self._weaviate_client - - @property - def knowledgebase(self) -> AsyncWeaviateKnowledgeBase: - """Get or create knowledge base instance.""" - if self._knowledgebase is None: - self._knowledgebase = AsyncWeaviateKnowledgeBase( - self.weaviate_client, - collection_name=self.configs.weaviate_collection_name, - embedding_model_name=self.configs.embedding_model_name, - ) + def sqlite_connection(self) -> SQLiteConnection: + """Get or create SQLite session.""" + if self._sqlite_connection is None: + self._sqlite_connection = SQLiteConnection() self._initialized = True - return self._knowledgebase + return self._sqlite_connection async def close(self) -> None: """Close all initialized async clients.""" - if self._weaviate_client is not None: - await self._weaviate_client.close() - self._weaviate_client = None - if self._openai_client is not None: await self._openai_client.close() self._openai_client = None - self._knowledgebase = None + if self._sqlite_connection is not None: + self._sqlite_connection.close() + self._sqlite_connection = None + self._initialized = False def is_initialized(self) -> bool: diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/create_weaviate_kb.py b/aieng-eval-agents/aieng/agent_evals/report_generation/create_weaviate_kb.py deleted file mode 100644 index 7359c78..0000000 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/create_weaviate_kb.py +++ /dev/null @@ -1,270 +0,0 @@ -"""Create a knowledge base in Weaviate from a HuggingFace dataset.""" - -import asyncio -import logging -import os -from typing import Any, Optional - -import click -import datasets -import weaviate.classes.config as wc -from datasets import Features, Sequence, Value, load_dataset -from dotenv import load_dotenv -from openai import AsyncOpenAI, OpenAIError -from weaviate.classes.config import DataType as WDataType -from weaviate.classes.init import Auth -from weaviate.client import WeaviateClient -from weaviate.collections.collection.sync import Collection -from weaviate.connect.helpers import connect_to_local, connect_to_weaviate_cloud - - -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") -load_dotenv(".env.participants") -logger = logging.getLogger(__name__) - - -SEARCHABLE_TEXT_COLUMN = "Description" - -HF_TO_WEAVIATE_TYPE_MAP = { - "string": WDataType.TEXT, - "list[string]": WDataType.TEXT_ARRAY, - "int64": WDataType.INT, - "int32": WDataType.INT, - "list[int64]": WDataType.INT_ARRAY, - "list[int32]": WDataType.INT_ARRAY, - "bool": WDataType.BOOL, - "list[bool]": WDataType.BOOL_ARRAY, - "float64": WDataType.NUMBER, - "float32": WDataType.NUMBER, - "list[float64]": WDataType.NUMBER_ARRAY, - "list[float32]": WDataType.NUMBER_ARRAY, - "timestamp[s]": WDataType.DATE, - "list[timestamp[s]]": WDataType.DATE_ARRAY, - "binary": WDataType.BLOB, - "dict": WDataType.OBJECT, -} - - -def _hf_type_to_str(feature: Any) -> str: - """Convert HuggingFace feature type to a string representation.""" - if isinstance(feature, Value): - return feature.dtype # e.g., 'string', 'int64', 'float64', 'bool', 'binary' - if isinstance(feature, Sequence): - # Recursively get the type of the inner feature - return f"list[{_hf_type_to_str(feature.feature)}]" - if isinstance(feature, list): - return f"list[{_hf_type_to_str(feature[0])}]" - if isinstance(feature, dict): - return "dict" - return str(type(feature)) - - -def _create_properties(features: Features) -> list[wc.Property]: - """Create Weaviate properties from HuggingFace dataset features.""" - properties = [] - for name, feature in features.items(): - print(name, feature) - hf_type = _hf_type_to_str(feature) - weaviate_type = HF_TO_WEAVIATE_TYPE_MAP.get(hf_type) - if weaviate_type is None: - raise ValueError(f"Unsupported feature type: {feature}") - - properties.append( - wc.Property( - name=name.replace("-", "_").replace(" ", "_") if name != "id" else "id_", - data_type=weaviate_type, - index_filterable=name != SEARCHABLE_TEXT_COLUMN, - index_searchable=name != SEARCHABLE_TEXT_COLUMN - and weaviate_type in [WDataType.TEXT, WDataType.TEXT_ARRAY], - skip_vectorization=name != SEARCHABLE_TEXT_COLUMN, - ) - ) - return properties - - -def _create_or_get_weaviate_collection(client: WeaviateClient, collection_name: str, features: Features) -> Collection: - """Get a Weaviate collection by name.""" - if client.collections.exists(collection_name): - collection = client.collections.get(collection_name) - else: - properties = _create_properties(features) - collection = client.collections.create( - collection_name, - properties=properties, - vectorizer_config=wc.Configure.Vectorizer.none(), - vector_index_config=wc.Configure.VectorIndex.dynamic( - distance_metric=wc.VectorDistances.COSINE, - threshold=10_000, - flat=wc.Configure.VectorIndex.flat(quantizer=wc.Configure.VectorIndex.Quantizer.bq(cache=True)), - hnsw=wc.Configure.VectorIndex.hnsw( - quantizer=wc.Configure.VectorIndex.Quantizer.pq(segments=128, training_limit=50_000), - ), - ), - ) - logger.info(f"Created Weaviate collection '{collection_name}' with properties: {properties}") - return collection - - -async def _get_embeddings(texts: list[str], embedding_client: AsyncOpenAI, model_name: str) -> list[list[float]]: - try: - embeddings = await embedding_client.embeddings.create(input=texts, model=model_name) - except OpenAIError as e: - if hasattr(e, "status_code") and e.status_code == 400 and "context" in str(e): - embeddings = await embedding_client.embeddings.create( - input=[text[:10000] for text in texts], model=model_name - ) - else: - raise - return [embedding.embedding for embedding in embeddings.data] - - -async def _producer( - dataset: datasets.Dataset, - batch_size: int, - embedding_client: AsyncOpenAI, - model_name: str, - obj_queue: asyncio.Queue, -) -> None: - """Create batches of objects from the dataset with the vector included.""" - for batch in dataset.iter(batch_size=batch_size): - objects: dict[str, list[Any]] = {} - - # Filter out None or empty strings from the batch - # Get index of empty or None text entries - null_indices = [i for i, text in enumerate(batch[SEARCHABLE_TEXT_COLUMN]) if text is None or text == ""] - # Remove empty or None text entries from the batch - if null_indices: - for key in batch: - objects[key.replace("-", "_").replace(" ", "_")] = [ - v for i, v in enumerate(batch[key]) if i not in null_indices - ] - else: - objects = batch - - # Get embeddings for the batch - embeddings = await _get_embeddings(objects[SEARCHABLE_TEXT_COLUMN], embedding_client, model_name) - - # Rename "id" to "id_" to avoid conflict with Weaviate's reserved field - if "id" in objects: - objects["id_"] = objects.pop("id") - - objects["vector"] = embeddings - - await obj_queue.put(objects) - - -async def _consumer(collection: Collection, obj_queue: asyncio.Queue) -> None: - """Consume objects from the queue and add them to Weaviate.""" - while True: - objects: dict[str, list[Any]] = await obj_queue.get() - if objects is None: - break # Exit signal - - with collection.batch.fixed_size(batch_size=len(objects), concurrent_requests=1) as batch: - vectors = objects.pop("vector") - for i in range(len(objects[SEARCHABLE_TEXT_COLUMN])): - obj = {k.replace("-", "_").replace(" ", "_"): v[i] for k, v in objects.items()} - batch.add_object(obj, vector=vectors[i]) - - # Flush the batch to Weaviate - batch.flush() # type: ignore[attr-defined] - obj_queue.task_done() - - -@click.command() -@click.option("--dataset-name", required=True, help="HuggingFace dataset name") -@click.option("--collection-name", required=True, help="Name of weaviate collection") -@click.option("--model-name", envvar="EMBEDDING_MODEL_NAME", help="Embedding model name") -@click.option("--batch-size", default=5, help="Batch size for processing") -@click.option("--max-concurrent", default=50, help="Max concurrent consumers") -@click.option("--retry-attempts", default=5, help="Number of retry attempts for requests") -@click.option("--timeout-seconds", default=30, help="Timeout for requests in seconds") -@click.option("--cache-dir", help="Dataset cache directory") -def _cli( - dataset_name, - collection_name, - model_name, - batch_size, - max_concurrent, - retry_attempts, - timeout_seconds, - cache_dir, -): - asyncio.run( - main( - dataset_name, - collection_name, - model_name, - batch_size, - max_concurrent, - retry_attempts, - timeout_seconds, - cache_dir, - ) - ) - - -async def main( - dataset_name: str, - collection_name: str, - model_name: str, - batch_size: int, - max_concurrent: int, - retry_attempts: int, - timeout_seconds: int, - cache_dir: str | None = None, -) -> None: - """Generate embeddings for a streaming HuggingFace dataset.""" - # Load dataset - dataset = load_dataset(dataset_name, split="train", cache_dir=cache_dir) - # Remove columns with nested/dict features - dataset = dataset.remove_columns( - [col for col in dataset.column_names if isinstance(dataset.features[col], dict) or "Unnamed" in col] - ) - assert SEARCHABLE_TEXT_COLUMN in dataset.column_names, f"Dataset must contain a '{SEARCHABLE_TEXT_COLUMN}' column" - - # Create Weaviate Client - weaviate_client = connect_to_weaviate_cloud( - cluster_url=os.environ["WEAVIATE_HTTP_HOST"], - auth_credentials=Auth.api_key(os.environ["WEAVIATE_API_KEY"]), - ) - weaviate_client = connect_to_local(headers={"X-OpenAI-Api-Key": os.environ["OPENAI_API_KEY"]}) - logger.info(f"Weaviate client ready: {weaviate_client.is_ready()}") - - # Get or create Weaviate collection - collection = _create_or_get_weaviate_collection(weaviate_client, collection_name, dataset.features) - - # Initialize OpenAI client for embedding service - embedding_client = AsyncOpenAI( - api_key=os.environ["EMBEDDING_API_KEY"], - base_url=os.environ["EMBEDDING_BASE_URL"], - timeout=timeout_seconds, - max_retries=retry_attempts, - ) - - # Orchestrate producer and consumers - obj_queue: asyncio.Queue[Optional[dict[str, list[Any]]]] = asyncio.Queue(maxsize=max_concurrent * 5) - producer = asyncio.create_task(_producer(dataset, batch_size, embedding_client, model_name, obj_queue)) - # Create consumers to process the queue - consumers = [asyncio.create_task(_consumer(collection, obj_queue)) for _ in range(max_concurrent)] - - try: - # Wait for the producer to finish - await producer - - # Signal consumers to stop - for _ in range(max_concurrent): - await obj_queue.put(None) - - # Wait for all consumers to finish - await asyncio.gather(*consumers) - except Exception as e: - logger.error(f"Unexpected error occurred: {e}", exc_info=True) - finally: - weaviate_client.close() - await embedding_client.close() - logger.info("Finished processing dataset and closing clients.") - - -if __name__ == "__main__": - _cli() diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/data/convert_dates.py b/aieng-eval-agents/aieng/agent_evals/report_generation/data/convert_dates.py new file mode 100644 index 0000000..24f0c91 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/data/convert_dates.py @@ -0,0 +1,100 @@ +"""Convert InvoiceDate format in OnlineRetail.db. + +Convert from 'MM/DD/YY HH:MM' to 'YYYY-MM-DD HH:MM' for better searching abilities. +""" + +import logging +import sqlite3 +from datetime import datetime + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +DB_PATH = "aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.db" + + +def convert_date(date_str: str) -> str | None: + """Convert date from 'MM/DD/YY HH:MM' to 'YYYY-MM-DD HH:MM'. + + Args: + date_str: Date string in format 'MM/DD/YY HH:MM' or 'MM/DD/YY H:MM' + Example: "12/19/10 16:26" -> "2010-12-19 16:26" + + Returns + ------- + Converted date string in format 'YYYY-MM-DD HH:MM' or None if parsing fails + """ + if not date_str or date_str.strip() == "": + return None + + try: + # Parse the date - format is DD/MM/YY (day/month/year) + # Format: "12/1/10 8:26" or "12/1/10 16:26" + # Split date and time parts + parts = date_str.strip().split(" ") + if len(parts) != 2: + logger.warning(f"Invalid date format (expected 'DD/MM/YY HH:MM'): {date_str}") + return None + + date_part, time_part = parts + + # Normalize time part to have 2-digit hour + time_parts = time_part.split(":") + if len(time_parts) != 2: + logger.warning(f"Invalid time format: {time_part}") + return None + + hour, minute = time_parts + if len(hour) == 1: + hour = f"0{hour}" + time_part = f"{hour}:{minute}" + + # Parse as DD/MM/YY (day/month/year) + dt = datetime.strptime(f"{date_part} {time_part}", "%m/%d/%y %H:%M") + # Convert to YYYY-MM-DD HH:MM format + return dt.strftime("%Y-%m-%d %H:%M") + except ValueError as e: + logger.warning(f"Could not parse date: {date_str} - {e}") + return None + + +def main(): + """Convert all InvoiceDate values in the database.""" + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + # Get all rows with InvoiceDate + cursor.execute("SELECT rowid, InvoiceDate FROM sales WHERE InvoiceDate IS NOT NULL AND InvoiceDate != ''") + rows = cursor.fetchall() + + logger.info(f"Found {len(rows)} rows with InvoiceDate to convert") + + updated_count = 0 + error_count = 0 + + for rowid, old_date in rows: + new_date = convert_date(old_date) + if new_date: + try: + cursor.execute("UPDATE sales SET InvoiceDate = ? WHERE rowid = ?", (new_date, rowid)) + updated_count += 1 + if updated_count % 100 == 0: + logger.info(f"Updated {updated_count} rows...") + except Exception as e: + logger.error(f"Error updating rowid {rowid}: {e}") + error_count += 1 + else: + logger.warning(f"Could not convert date for rowid {rowid}: {old_date}") + error_count += 1 + + conn.commit() + conn.close() + + logger.info("Conversion complete!") + logger.info(f" Updated: {updated_count} rows") + logger.info(f" Errors: {error_count} rows") + + +if __name__ == "__main__": + main() diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/data_import.py b/aieng-eval-agents/aieng/agent_evals/report_generation/data_import.py deleted file mode 100644 index e769845..0000000 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/data_import.py +++ /dev/null @@ -1,318 +0,0 @@ -""" -Import data from the Online Retail dataset into Weaviate. - -https://archive.ics.uci.edu/dataset/352/online+retail -""" - -import asyncio -import csv -import logging -import os -from copy import copy -from datetime import datetime -from typing import Any - -from openai import AsyncOpenAI, OpenAIError -from weaviate.classes.config import Configure, DataType, Property, VectorDistances -from weaviate.client import WeaviateClient -from weaviate.collections.collection.sync import Collection -from weaviate.connect.helpers import connect_to_local - - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -DATASET_PATH = "/Users/marcelolotif/Downloads/Online Retail Sample 1k.csv" -COLLECTION_NAME = os.getenv("WEAVIATE_COLLECTION_NAME") -EMBEDDING_MODEL_NAME = os.getenv("EMBEDDING_MODEL_NAME") -SEACHEABLE_TEXT_COLUMN = "Description" - - -def is_numeric(value: str) -> bool: - """Check if a string represents a numeric value. - - Args: - value: The value to check. - - Returns - ------- - True if the value is a numeric value, False otherwise. - """ - try: - float(value) - return True - except ValueError: - return False - - -def type_for_column_name(column_name: str) -> DataType: - """Get the data type for a column name. - - Args: - column_name: The name of the column. - - Returns - ------- - The data type for the column. - """ - if column_name in ["InvoiceNo", "Quantity", "CustomerID"]: - return DataType.INT - if column_name == "UnitPrice": - return DataType.NUMBER - if column_name == "InvoiceDate": - return DataType.DATE - return DataType.TEXT - - -def make_data_row(column_names: list[str], row: list[str]) -> dict[str, Any] | None: - """Make a data row from a list of column names and a list of values. - - Args: - column_names: The names of the columns. - row: The values of the row. - - Returns - ------- - The data row, or None if the row is invalid. - """ - if len(row) == 0: - logger.warning(f"Skipping row because it has no values: {row}") - return None - - data_row: dict[str, Any] = {} - for i in range(len(row)): - column_name = column_names[i] - value = row[i] - - try: - data_type = type_for_column_name(column_name) - if data_type == DataType.INT: - data_row[column_name] = int(value) - elif data_type == DataType.NUMBER: - data_row[column_name] = float(value) - elif data_type == DataType.DATE: - data_row[column_name] = datetime.strptime(value, "%m/%d/%y %H:%M") - else: - data_row[column_name] = str(value) - except Exception: - logger.exception(f"Skipping row because of error: {row}") - return None - - return data_row - - -def make_collection_with_column_names(weaviate_client: WeaviateClient, column_names: list[str]) -> Collection: - """Make a collection with the column names of the rows. - - Args: - weaviate_client: The Weaviate client. - column_names: The names of the columns. - - Returns - ------- - The weaviate collection object. - """ - properties = [] - for column_name in column_names: - properties.append(Property(name=column_name, data_type=type_for_column_name(column_name))) - - assert COLLECTION_NAME is not None, "WEAVIATE_COLLECTION_NAME env var must be set" - - return weaviate_client.collections.create( - name=COLLECTION_NAME, - vectorizer_config=Configure.Vectorizer.none(), - vector_index_config=Configure.VectorIndex.dynamic( - distance_metric=VectorDistances.COSINE, - threshold=10_000, - flat=Configure.VectorIndex.flat(quantizer=Configure.VectorIndex.Quantizer.bq(cache=True)), - hnsw=Configure.VectorIndex.hnsw( - quantizer=Configure.VectorIndex.Quantizer.pq(segments=128, training_limit=50_000), - ), - ), - properties=properties, - ) - - -async def get_embeddings(texts: list[str], embedding_client: AsyncOpenAI, model_name: str) -> list[list[float]]: - """Get embeddings for a list of texts. - - Args: - texts: The texts to get embeddings for. - embedding_client: The embedding client. - model_name: The model name to use. - - Returns - ------- - The embeddings for the texts. - """ - try: - embeddings = await embedding_client.embeddings.create(input=texts, model=model_name) - except OpenAIError as e: - if hasattr(e, "status_code") and e.status_code == 400 and "context" in str(e): - embeddings = await embedding_client.embeddings.create( - input=[text[:10000] for text in texts], model=model_name - ) - else: - raise - return [embedding.embedding for embedding in embeddings.data] - - -def list_of_dicts_to_dict_of_lists(data: list[dict[str, Any]]) -> dict[str, list[Any]]: - """Convert a list of dictionaries to a dictionary of lists. - - Args: - data: List of dictionaries with the same keys - - Returns - ------- - Dictionary where each key maps to a list of values - """ - if not data: - return {} - - keys = data[0].keys() - return {key: [item[key] for item in data] for key in keys} - - -async def producer( - dataset: list[dict[str, Any]], - batch_size: int, - embedding_client: AsyncOpenAI, - model_name: str, - obj_queue: asyncio.Queue, -) -> None: - """Create batches of objects from the dataset with the vector included.""" - for i in range(0, len(dataset), batch_size): - batch = list_of_dicts_to_dict_of_lists(dataset[i : i + batch_size]) - - objects: dict[str, list[Any]] = {} - - # Filter out None or empty strings from the batch - # Get index of empty or None text entries - null_indices = [i for i, text in enumerate(batch[SEACHEABLE_TEXT_COLUMN]) if text is None or text == ""] - # Remove empty or None text entries from the batch - if null_indices: - for key in batch: - objects[key.replace("-", "_").replace(" ", "_")] = [ - v for i, v in enumerate(batch[key]) if i not in null_indices - ] - else: - objects = batch - - # Get embeddings for the batch - embeddings = await get_embeddings(objects[SEACHEABLE_TEXT_COLUMN], embedding_client, model_name) - - # Rename "id" to "id_" to avoid conflict with Weaviate's reserved field - if "id" in objects: - objects["id_"] = objects.pop("id") - - objects["vector"] = embeddings - - await obj_queue.put(objects) - - -async def consumer(collection: Collection, obj_queue: asyncio.Queue) -> None: - """Consume objects from the queue and add them to Weaviate.""" - while True: - objects: dict[str, list[Any]] = await obj_queue.get() - if objects is None: - break # Exit signal - - with collection.batch.fixed_size(batch_size=len(objects), concurrent_requests=1) as batch: - vectors = objects.pop("vector") - for i in range(len(objects[SEACHEABLE_TEXT_COLUMN])): - obj = {k.replace("-", "_").replace(" ", "_"): v[i] for k, v in objects.items()} - batch.add_object(obj, vector=vectors[i]) - - # Flush the batch to Weaviate - batch.flush() # type: ignore[attr-defined] - - obj_queue.task_done() - - -async def main(): - """Import data from the Online Retail dataset into Weaviate.""" - weaviate_client = connect_to_local(headers={"X-OpenAI-Api-Key": os.environ["OPENAI_API_KEY"]}) - weaviate_collection = None - data_rows = [] - - with open(DATASET_PATH, "r") as file: - csv_reader = csv.reader(file) - column_names = None - first_row = True - logger.info("Collecting data points...") - - for r in csv_reader: - row = copy(r) - if first_row: - first_row = False - - column_names = row - - if weaviate_client.collections.exists(COLLECTION_NAME): - weaviate_collection = weaviate_client.collections.use(COLLECTION_NAME) - else: - weaviate_collection = make_collection_with_column_names(weaviate_client, column_names) - - else: - if not is_numeric(row[0]): - logger.warning(f"Skipping row because it seems to be invalid: {row}") - continue - - assert column_names is not None, "Column names should not be None" - data_row = make_data_row(column_names, row) - if data_row is not None: - data_rows.append(data_row) - - assert weaviate_collection is not None, "Weaviate collection should not be None" - - batch_size = 5 - max_concurrent = 50 - - embedding_client = AsyncOpenAI( - api_key=os.environ["EMBEDDING_API_KEY"], - base_url=os.environ["EMBEDDING_BASE_URL"], - timeout=30, - max_retries=5, - ) - - # Orchestrate producer and consumers - obj_queue = asyncio.Queue(maxsize=max_concurrent * 5) - producer_task = asyncio.create_task( - producer(data_rows, batch_size, embedding_client, EMBEDDING_MODEL_NAME, obj_queue) - ) - # Create consumers to process the queue - consumer_tasks = [asyncio.create_task(consumer(weaviate_collection, obj_queue)) for _ in range(max_concurrent)] - - try: - # Wait for the producer to finish - await producer_task - - # Signal consumers to stop - for _ in range(max_concurrent): - await obj_queue.put(None) - - # Wait for all consumers to finish - await asyncio.gather(*consumer_tasks) - except Exception as e: - logger.error(f"Unexpected error occurred: {e}", exc_info=True) - - # Check for failed objects - failed_objects = weaviate_collection.batch.failed_objects - if failed_objects: - print(f"Number of failed imports: {len(failed_objects)}") - for failed in failed_objects[:3]: # Show first 3 failures - print(f"Failed object: {failed}") - - # Verify client-side batch import - result = weaviate_collection.aggregate.over_all(total_count=True) - logger.info(f"Client-side batch had {len(failed_objects)} failures") - logger.info(f"Expected {len(data_rows)} objects, got {result.total_count}") - logger.info(f"✓ Client-side batch: {result.total_count} objects imported successfully") - - weaviate_client.close() - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/docker-compose.yaml b/aieng-eval-agents/aieng/agent_evals/report_generation/docker-compose.yaml deleted file mode 100644 index 0791edd..0000000 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/docker-compose.yaml +++ /dev/null @@ -1,13 +0,0 @@ -services: - weaviate: - image: cr.weaviate.io/semitechnologies/weaviate:1.35.3 - ports: - - 8080:8080 - - 50051:50051 - restart: on-failure:0 - environment: - QUERY_DEFAULTS_LIMIT: 25 - AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true' - PERSISTENCE_DATA_PATH: '/var/lib/weaviate' - CLUSTER_HOSTNAME: 'node1' - ASYNC_INDEXING: 'true' diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/main.py b/aieng-eval-agents/aieng/agent_evals/report_generation/main.py index b2e3bd8..d45c2d0 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/main.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/main.py @@ -16,10 +16,9 @@ REACT_INSTRUCTIONS = """\ -Perform the task using the search tool. \ +Perform the task using the SQLite database tool. \ EACH TIME before invoking the function, you must explain your reasons for doing so. \ -Be sure to mention the sources in your response. \ -If the search tool did not return intended results, try again. \ +If the SQL query did not return intended results, try again. \ For best performance, divide complex queries into simpler sub-queries. \ Do not make up information. \ For facts that might change over time, you must use the search tool to retrieve the \ @@ -52,7 +51,7 @@ async def _main( # We wrap the `search_knowledgebase` method with `function_tool`, which # will construct the tool definition JSON schema by extracting the necessary # information from the method signature and docstring. - tools=[agents.function_tool(client_manager.knowledgebase.search_knowledgebase)], + tools=[agents.function_tool(client_manager.sqlite_connection.execute)], model=agents.OpenAIChatCompletionsModel( model=client_manager.configs.default_worker_model, openai_client=client_manager.openai_client, @@ -86,7 +85,7 @@ async def _main( # NOTE: Examples must be a list of lists when additional inputs are provided additional_inputs=gr.State(value={}, render=False), examples=[ - ["Generate a monthly sales performance report for the last year that data is available."], + ["Generate a monthly sales performance report for the last year with available data."], ], title="2.1: ReAct for Retrieval-Augmented Generation with OpenAI Agent SDK", ) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/weaviate.py b/aieng-eval-agents/aieng/agent_evals/report_generation/weaviate.py deleted file mode 100644 index a0eeaaf..0000000 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/weaviate.py +++ /dev/null @@ -1,210 +0,0 @@ -"""Implements knowledge retrieval tool for Weaviate.""" - -import asyncio -import logging -import os -from typing import Awaitable, Callable, TypeVar - -import backoff -import openai -import pydantic -from aieng.agent_evals.report_generation.utils import Configs -from weaviate.client import WeaviateAsyncClient -from weaviate.connect.helpers import use_async_with_custom - - -class _Source(pydantic.BaseModel): - """Type hints for the "_source" field in ES Search Results.""" - - title: str - section: str | None = None - - -class _Highlight(pydantic.BaseModel): - """Type hints for the "highlight" field in ES Search Results.""" - - text: list[str] - - -class _SearchResult(pydantic.BaseModel): - """Type hints for knowledge base search result.""" - - source: _Source = pydantic.Field(alias="_source") - highlight: _Highlight - - def __repr__(self) -> str: - return self.model_dump_json(indent=2) - - -SearchResults = list[_SearchResult] - - -class AsyncWeaviateKnowledgeBase: - """Configurable search tools for Weaviate knowledge base.""" - - def __init__( - self, - async_client: WeaviateAsyncClient, - collection_name: str, - num_results: int = 5, - snippet_length: int = 1000, - max_concurrency: int = 3, - embedding_model_name: str = "@cf/baai/bge-m3", - embedding_api_key: str | None = None, - embedding_base_url: str | None = None, - ) -> None: - self.async_client = async_client - self.collection_name = collection_name - self.num_results = num_results - self.snippet_length = snippet_length - self.logger = logging.getLogger(__name__) - self.semaphore = asyncio.Semaphore(max_concurrency) - - self.embedding_model_name = embedding_model_name - self.embedding_api_key = embedding_api_key - self.embedding_base_url = embedding_base_url - - self._embed_client = openai.OpenAI( - api_key=self.embedding_api_key or os.getenv("EMBEDDING_API_KEY"), - base_url=self.embedding_base_url or os.getenv("EMBEDDING_BASE_URL"), - max_retries=5, - ) - - @backoff.on_exception(backoff.expo, exception=asyncio.CancelledError) # type: ignore - async def search_knowledgebase(self, keyword: str) -> SearchResults: - """Search knowledge base. - - Parameters - ---------- - keyword : str - The search keyword to query the knowledge base. - - Returns - ------- - SearchResults - A list of search results. Each result contains source and highlight. - If no results are found, returns an empty list. - - Raises - ------ - Exception - If Weaviate is not ready to accept requests (HTTP 503). - - """ - async with self.async_client: - if not await self.async_client.is_ready(): - raise Exception("Weaviate is not ready to accept requests (HTTP 503).") - - collection = self.async_client.collections.get(self.collection_name) - vector = self._vectorize(keyword) - response = await rate_limited( - lambda: collection.query.hybrid( - keyword, vector=vector, limit=self.num_results - ), - semaphore=self.semaphore, - ) - - self.logger.info(f"Query: {keyword}; Returned matches: {len(response.objects)}") - - hits = [] - for obj in response.objects: - text = obj.properties.get("text", "") - assert isinstance(text, str) - - hit = { - "_source": { - "title": obj.properties.get("title", ""), - "section": obj.properties.get("section", None), - }, - "highlight": {"text": [text[: self.snippet_length]]}, - } - hits.append(hit) - - return [_SearchResult.model_validate(_hit) for _hit in hits] - - def _vectorize(self, text: str) -> list[float]: - """Vectorize text using the embedding client. - - Parameters - ---------- - text : str - The text to be vectorized. - - Returns - ------- - list[float] - A list of floats representing the vectorized text. - """ - response = self._embed_client.embeddings.create( - input=text, model=self.embedding_model_name - ) - return response.data[0].embedding - - -def get_weaviate_async_client(configs: Configs) -> WeaviateAsyncClient: - """Get an async Weaviate client. - - If no parameters are provided, the function will attempt to connect to a local - Weaviate instance using environment variables. - - Parameters - ---------- - http_host : str, optional, default=None - The HTTP host for the Weaviate instance. If not provided, defaults to the - `WEAVIATE_HTTP_HOST` environment variable or "localhost" if the environment - variable is not set. - http_port : int, optional, default=None - The HTTP port for the Weaviate instance. If not provided, defaults to the - `WEAVIATE_HTTP_PORT` environment variable or 8080 if the environment variable - is not set. - http_secure : bool, optional, default=False - Whether to use HTTPS for the HTTP connection. Defaults to the - `WEAVIATE_HTTP_SECURE` environment variable or `False` if the environment - variable is not set. - grpc_host : str, optional, default=None - The gRPC host for the Weaviate instance. If not provided, defaults to the - `WEAVIATE_GRPC_HOST` environment variable or "localhost" if the environment - variable is not set. - grpc_port : int, optional, default=None - The gRPC port for the Weaviate instance. If not provided, defaults to the - `WEAVIATE_GRPC_PORT` environment variable or 50051 if the environment variable - is not set. - grpc_secure : bool, optional, default=False - Whether to use secure gRPC. Defaults to the `WEAVIATE_GRPC_SECURE` environment - variable or `False` if the environment variable is not set. - api_key : str, optional, default=None - The API key for authentication with Weaviate. If not provided, defaults to the - `WEAVIATE_API_KEY` environment variable. - headers : dict[str, str], optional, default=None - Additional headers to include in the request. - additional_config : AdditionalConfig, optional, default=None - Additional configuration for the Weaviate client. - skip_init_checks : bool, optional, default=False - Whether to skip initialization checks. - - Returns - ------- - WeaviateAsyncClient - An asynchronous Weaviate client configured with the provided parameters. - """ - return use_async_with_custom( - http_host=configs.weaviate_http_host or "localhost", - http_port=configs.weaviate_http_port or 8080, - http_secure=configs.weaviate_http_secure or False, - grpc_host=configs.weaviate_grpc_host or "localhost", - grpc_port=configs.weaviate_grpc_port or 50051, - grpc_secure=configs.weaviate_grpc_secure or False, - auth_credentials=configs.weaviate_api_key or None, - ) - - -T = TypeVar("T") - - -async def rate_limited( - _fn: Callable[[], Awaitable[T]], - semaphore: asyncio.Semaphore, -) -> T: - """Run _fn with semaphore rate limit.""" - async with semaphore: - return await _fn() From 22fc5691cb80e3164ccd886c2dac68e08aebbf39 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 22 Jan 2026 12:28:40 -0500 Subject: [PATCH 05/10] Adding more report examples --- .../aieng/agent_evals/report_generation/main.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/main.py b/aieng-eval-agents/aieng/agent_evals/report_generation/main.py index d45c2d0..c7ac1dc 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/main.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/main.py @@ -86,6 +86,15 @@ async def _main( additional_inputs=gr.State(value={}, render=False), examples=[ ["Generate a monthly sales performance report for the last year with available data."], + ["Generate a report of the top 5 selling products per year and the total sales for each product."], + ["Generate a report of the average order value per invoice per month."], + ["Generate a report with the month-over-month trends in sales."], + ["Generate a report on sales revenue by country per year."], + ["Generate a report on the 5 highest-value customers per year vs. the average customer."], + [ + "Generate a report on the average amount spent by one time buyers for each year vs. the average customer." + ], + ["Generate a report on the daily, weekly and monthly sales trends."], ], title="2.1: ReAct for Retrieval-Augmented Generation with OpenAI Agent SDK", ) From 3458565d1a3f6bc26598d8773bab6bff45a6b8d8 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 22 Jan 2026 14:39:47 -0500 Subject: [PATCH 06/10] Generating xlsx reports --- .gitignore | 3 +- .../report_generation/async_client_manager.py | 58 ++++++++++++++++++- .../agent_evals/report_generation/main.py | 13 +++-- pyproject.toml | 1 + uv.lock | 23 ++++++++ 5 files changed, 91 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 0b28586..78c1eae 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,5 @@ wheels/ .env .gradio -aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.db +aieng-eval-agents/aieng/agent_evals/report_generation/data/*.db +aieng-eval-agents/aieng/agent_evals/report_generation/reports/* diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py b/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py index 77da0de..7bf93f2 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py @@ -6,18 +6,25 @@ """ import sqlite3 +import urllib.parse +from pathlib import Path from typing import Any +import pandas as pd from aieng.agent_evals.report_generation.utils import Configs from openai import AsyncOpenAI from weaviate.client import WeaviateAsyncClient +SQLITE_DB_PATH = Path("aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.db") +REPORTS_OUTPUT_PATH = Path("aieng-eval-agents/aieng/agent_evals/report_generation/reports/") + + class SQLiteConnection: """SQLite connection.""" def __init__(self) -> None: - self._connection = sqlite3.connect("aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.db") + self._connection = sqlite3.connect(SQLITE_DB_PATH) def execute(self, query: str) -> list[Any]: """Execute a SQLite query. @@ -37,6 +44,33 @@ def close(self) -> None: self._connection.close() +class ReportFileWriter: + """Write reports to a file.""" + + def write_report_to_file( + self, report_data: list[Any], report_columns: list[str], filename: str = "report.xlsx" + ) -> str: + """Write a report to a XLSX file. + + Args: + report_data: The data of the report + report_columns: The columns of the report + filename: The name of the file to create. Default is "report.xlsx". + + Returns + ------- + The URL link to the report file. + """ + # Create reports directory if it doesn't exist + REPORTS_OUTPUT_PATH.mkdir(exist_ok=True) + filepath = REPORTS_OUTPUT_PATH / filename + + report_df = pd.DataFrame(report_data, columns=report_columns) + report_df.to_excel(filepath, index=False) + + return _make_gradio_file_link(filepath) + + class AsyncClientManager: """Manages async client lifecycle with lazy initialization and cleanup. @@ -79,6 +113,7 @@ def __init__(self, configs: Configs | None = None) -> None: self._weaviate_client: WeaviateAsyncClient | None = None self._openai_client: AsyncOpenAI | None = None self._sqlite_connection: SQLiteConnection | None = None + self._report_file_writer: ReportFileWriter | None = None self._initialized: bool = False @property @@ -104,6 +139,14 @@ def sqlite_connection(self) -> SQLiteConnection: self._initialized = True return self._sqlite_connection + @property + def report_file_writer(self) -> ReportFileWriter: + """Get or create ReportFileWriter.""" + if self._report_file_writer is None: + self._report_file_writer = ReportFileWriter() + self._initialized = True + return self._report_file_writer + async def close(self) -> None: """Close all initialized async clients.""" if self._openai_client is not None: @@ -119,3 +162,16 @@ async def close(self) -> None: def is_initialized(self) -> bool: """Check if any clients have been initialized.""" return self._initialized + + +def _make_gradio_file_link(filepath: Path) -> str: + """Make a Gradio file link from a filepath. + + Args: + filepath: The path to the file. + + Returns + ------- + A Gradio file link. + """ + return f"gradio_api/file={urllib.parse.quote(str(filepath), safe='')}" diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/main.py b/aieng-eval-agents/aieng/agent_evals/report_generation/main.py index c7ac1dc..5000d8a 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/main.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/main.py @@ -6,7 +6,7 @@ import agents import gradio as gr -from aieng.agent_evals.report_generation.async_client_manager import AsyncClientManager +from aieng.agent_evals.report_generation.async_client_manager import REPORTS_OUTPUT_PATH, AsyncClientManager from aieng.agent_evals.report_generation.utils import ( get_or_create_session, oai_agent_stream_to_gradio_messages, @@ -21,8 +21,8 @@ If the SQL query did not return intended results, try again. \ For best performance, divide complex queries into simpler sub-queries. \ Do not make up information. \ -For facts that might change over time, you must use the search tool to retrieve the \ -most up-to-date information. +When the report is done, use the report file writer tool to write it to a file. \ +At the end, provide a link to the report file to the user. """ @@ -51,7 +51,10 @@ async def _main( # We wrap the `search_knowledgebase` method with `function_tool`, which # will construct the tool definition JSON schema by extracting the necessary # information from the method signature and docstring. - tools=[agents.function_tool(client_manager.sqlite_connection.execute)], + tools=[ + agents.function_tool(client_manager.sqlite_connection.execute), + agents.function_tool(client_manager.report_file_writer.write_report_to_file), + ], model=agents.OpenAIChatCompletionsModel( model=client_manager.configs.default_worker_model, openai_client=client_manager.openai_client, @@ -100,6 +103,6 @@ async def _main( ) try: - demo.launch(share=False) + demo.launch(share=False, allowed_paths=[REPORTS_OUTPUT_PATH.absolute()]) finally: asyncio.run(AsyncClientManager.get_instance().close()) diff --git a/pyproject.toml b/pyproject.toml index 84c2685..81fef54 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "scikit-learn>=1.7.0", "weaviate-client>=4.18.3", "urllib3>=2.6.3", + "openpyxl>=3.1.5", ] [dependency-groups] diff --git a/uv.lock b/uv.lock index 1b858aa..5743f7b 100644 --- a/uv.lock +++ b/uv.lock @@ -30,6 +30,7 @@ dependencies = [ { name = "numpy" }, { name = "openai" }, { name = "openai-agents" }, + { name = "openpyxl" }, { name = "plotly" }, { name = "pydantic" }, { name = "pydantic-ai-slim", extra = ["logfire"] }, @@ -88,6 +89,7 @@ requires-dist = [ { name = "numpy", specifier = "<2.3.0" }, { name = "openai", specifier = ">=2.8.1" }, { name = "openai-agents", specifier = ">=0.6.1" }, + { name = "openpyxl", specifier = ">=3.1.5" }, { name = "plotly", specifier = ">=6.5.0" }, { name = "pydantic", specifier = ">=2.12.4" }, { name = "pydantic-ai-slim", extras = ["logfire"], specifier = ">=1.26.0" }, @@ -1060,6 +1062,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/de/15/545e2b6cf2e3be84bc1ed85613edd75b8aea69807a71c26f4ca6a9258e82/email_validator-2.3.0-py3-none-any.whl", hash = "sha256:80f13f623413e6b197ae73bb10bf4eb0908faf509ad8362c5edeb0be7fd450b4", size = 35604, upload-time = "2025-08-26T13:09:05.858Z" }, ] +[[package]] +name = "et-xmlfile" +version = "2.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d3/38/af70d7ab1ae9d4da450eeec1fa3918940a5fafb9055e934af8d6eb0c2313/et_xmlfile-2.0.0.tar.gz", hash = "sha256:dab3f4764309081ce75662649be815c4c9081e88f0837825f90fd28317d4da54", size = 17234, upload-time = "2024-10-25T17:25:40.039Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c1/8b/5fe2cc11fee489817272089c4203e679c63b570a5aaeb18d852ae3cbba6a/et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa", size = 18059, upload-time = "2024-10-25T17:25:39.051Z" }, +] + [[package]] name = "executing" version = "2.2.0" @@ -3172,6 +3183,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/11/53/d8076306f324992c79e9b2ee597f2ce863f0ac5d1fd24e6ad88f2a4dcbc0/openai_agents-0.6.1-py3-none-any.whl", hash = "sha256:7bde01c8d2fd723b0c72c9b207dcfeb12a8d211078f5d259945fb163a6f52b89", size = 237609, upload-time = "2025-11-20T01:17:06.454Z" }, ] +[[package]] +name = "openpyxl" +version = "3.1.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "et-xmlfile" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/3d/f9/88d94a75de065ea32619465d2f77b29a0469500e99012523b91cc4141cd1/openpyxl-3.1.5.tar.gz", hash = "sha256:cf0e3cf56142039133628b5acffe8ef0c12bc902d2aadd3e0fe5878dc08d1050", size = 186464, upload-time = "2024-06-28T14:03:44.161Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c0/da/977ded879c29cbd04de313843e76868e6e13408a94ed6b987245dc7c8506/openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2", size = 250910, upload-time = "2024-06-28T14:03:41.161Z" }, +] + [[package]] name = "opentelemetry-api" version = "1.38.0" From 37b40004c56d1115fadac2d25c09a5cc33130300 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Fri, 23 Jan 2026 12:27:23 -0500 Subject: [PATCH 07/10] Movign files around, adding the ddl file and the import script --- .../async_client_manager.py | 60 +++++++----- .../utils.py => configs.py} | 96 +------------------ .../agent_evals/report_generation/README.md | 37 ++++++- .../report_generation/data/OnlineRetail.ddl | 10 ++ ..._dates.py => import_online_retail_data.py} | 85 ++++++++-------- .../agent_evals/report_generation/main.py | 13 ++- aieng-eval-agents/aieng/agent_evals/utils.py | 96 +++++++++++++++++++ 7 files changed, 230 insertions(+), 167 deletions(-) rename aieng-eval-agents/aieng/agent_evals/{report_generation => }/async_client_manager.py (74%) rename aieng-eval-agents/aieng/agent_evals/{report_generation/utils.py => configs.py} (54%) create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.ddl rename aieng-eval-agents/aieng/agent_evals/report_generation/data/{convert_dates.py => import_online_retail_data.py} (52%) create mode 100644 aieng-eval-agents/aieng/agent_evals/utils.py diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py b/aieng-eval-agents/aieng/agent_evals/async_client_manager.py similarity index 74% rename from aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py rename to aieng-eval-agents/aieng/agent_evals/async_client_manager.py index 7bf93f2..7d79b4b 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/async_client_manager.py +++ b/aieng-eval-agents/aieng/agent_evals/async_client_manager.py @@ -5,26 +5,30 @@ hot-reload process. """ +import os import sqlite3 import urllib.parse from pathlib import Path from typing import Any import pandas as pd -from aieng.agent_evals.report_generation.utils import Configs +from aieng.agent_evals.configs import Configs from openai import AsyncOpenAI from weaviate.client import WeaviateAsyncClient -SQLITE_DB_PATH = Path("aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.db") -REPORTS_OUTPUT_PATH = Path("aieng-eval-agents/aieng/agent_evals/report_generation/reports/") +# Will use these as default if no path is provided in the +# REPORT_GENERATION_DB_PATH and REPORTS_OUTPUT_PATH env vars +DEFAULT_SQLITE_DB_PATH = Path("aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.db") +DEFAULT_REPORTS_OUTPUT_PATH = Path("aieng-eval-agents/aieng/agent_evals/report_generation/reports/") class SQLiteConnection: """SQLite connection.""" def __init__(self) -> None: - self._connection = sqlite3.connect(SQLITE_DB_PATH) + db_path = os.getenv("REPORT_GENERATION_DB_PATH", DEFAULT_SQLITE_DB_PATH) + self._connection = sqlite3.connect(db_path) def execute(self, query: str) -> list[Any]: """Execute a SQLite query. @@ -48,7 +52,11 @@ class ReportFileWriter: """Write reports to a file.""" def write_report_to_file( - self, report_data: list[Any], report_columns: list[str], filename: str = "report.xlsx" + self, + report_data: list[Any], + report_columns: list[str], + filename: str = "report.xlsx", + gradio_link: bool = True, ) -> str: """Write a report to a XLSX file. @@ -56,19 +64,40 @@ def write_report_to_file( report_data: The data of the report report_columns: The columns of the report filename: The name of the file to create. Default is "report.xlsx". + gradio_link: Whether to return a file link that works with Gradio UI. + Default is True. Returns ------- - The URL link to the report file. + The path to the report file. If `gradio_link` is True, will return + a URL link that allows Gradio UI to donwload the file. """ # Create reports directory if it doesn't exist - REPORTS_OUTPUT_PATH.mkdir(exist_ok=True) - filepath = REPORTS_OUTPUT_PATH / filename + reports_output_path = self.get_reports_output_path() + reports_output_path.mkdir(exist_ok=True) + filepath = reports_output_path / filename report_df = pd.DataFrame(report_data, columns=report_columns) report_df.to_excel(filepath, index=False) - return _make_gradio_file_link(filepath) + file_uri = str(filepath) + if gradio_link: + file_uri = f"gradio_api/file={urllib.parse.quote(str(file_uri), safe='')}" + + return file_uri + + @staticmethod + def get_reports_output_path() -> Path: + """Get the reports output path. + + If no path is provided in the REPORTS_OUTPUT_PATH env var, will use the + default path in DEFAULT_REPORTS_OUTPUT_PATH. + + Returns + ------- + The reports output path. + """ + return Path(os.getenv("REPORTS_OUTPUT_PATH", DEFAULT_REPORTS_OUTPUT_PATH)) class AsyncClientManager: @@ -162,16 +191,3 @@ async def close(self) -> None: def is_initialized(self) -> bool: """Check if any clients have been initialized.""" return self._initialized - - -def _make_gradio_file_link(filepath: Path) -> str: - """Make a Gradio file link from a filepath. - - Args: - filepath: The path to the file. - - Returns - ------- - A Gradio file link. - """ - return f"gradio_api/file={urllib.parse.quote(str(filepath), safe='')}" diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/utils.py b/aieng-eval-agents/aieng/agent_evals/configs.py similarity index 54% rename from aieng-eval-agents/aieng/agent_evals/report_generation/utils.py rename to aieng-eval-agents/aieng/agent_evals/configs.py index 8bafdbb..c3cc425 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/utils.py +++ b/aieng-eval-agents/aieng/agent_evals/configs.py @@ -1,103 +1,9 @@ -"""Utility functions for the report generation agent.""" +"""Configuration settings for the agent evals.""" -import uuid -from typing import Any - -from agents import SQLiteSession, StreamEvent, stream_events -from agents.items import ToolCallOutputItem -from gradio.components.chatbot import ChatMessage, MetadataDict -from openai.types.responses import ResponseFunctionToolCall, ResponseOutputText -from openai.types.responses.response_completed_event import ResponseCompletedEvent -from openai.types.responses.response_output_message import ResponseOutputMessage from pydantic import AliasChoices, Field from pydantic_settings import BaseSettings, SettingsConfigDict -def oai_agent_stream_to_gradio_messages(stream_event: StreamEvent) -> list[ChatMessage]: - """Parse agent sdk "stream event" into a list of gr messages. - - Adds extra data for tool use to make the gradio display informative. - """ - output: list[ChatMessage] = [] - - if isinstance(stream_event, stream_events.RawResponsesStreamEvent): - data = stream_event.data - if isinstance(data, ResponseCompletedEvent): - # The completed event may contain multiple output messages, - # including tool calls and final outputs. - # If there is at least one tool call, we mark the response as a thought. - is_thought = len(data.response.output) > 1 and any( - isinstance(message, ResponseFunctionToolCall) for message in data.response.output - ) - - for message in data.response.output: - if isinstance(message, ResponseOutputMessage): - for _item in message.content: - if isinstance(_item, ResponseOutputText): - output.append( - ChatMessage( - role="assistant", - content=_item.text, - metadata={ - "title": "🧠 Thought", - "id": data.sequence_number, - } - if is_thought - else MetadataDict(), - ) - ) - elif isinstance(message, ResponseFunctionToolCall): - output.append( - ChatMessage( - role="assistant", - content=f"```\n{message.arguments}\n```", - metadata={ - "title": f"🛠️ Used tool `{message.name}`", - }, - ) - ) - - elif isinstance(stream_event, stream_events.RunItemStreamEvent): - name = stream_event.name - item = stream_event.item - - if name == "tool_output" and isinstance(item, ToolCallOutputItem): - output.append( - ChatMessage( - role="assistant", - content=f"```\n{item.output}\n```", - metadata={ - "title": "*Tool call output*", - "status": "done", # This makes it collapsed by default - }, - ) - ) - - return output - - -def get_or_create_session( - history: list[ChatMessage], - session_state: dict[str, Any], -) -> SQLiteSession: - """Get existing session or create a new one for conversation persistence. - - Args: - history: The history of the conversation. - session_state: The state of the session. - - Returns - ------- - The session. - """ - if len(history) == 0: - session = SQLiteSession(session_id=str(uuid.uuid4())) - session_state["session"] = session - else: - session = session_state["session"] - return session - - class Configs(BaseSettings): """Configuration settings loaded from environment variables. diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/README.md b/aieng-eval-agents/aieng/agent_evals/report_generation/README.md index 9166f4c..39dea3e 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/README.md +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/README.md @@ -1,17 +1,48 @@ # Report Generation Agent +This code implements an example of a Report Generation Agent for single-table relational +data source. + +The data source implemented here is [SQLite](https://sqlite.org/) which is supported +natively by Python and saves the data in disk. + +The Report Generation Agent will provide an UI to read user queries in natural language +and procceeds to make SQL queries to the database. At the end, the Agent will provide +a downloadable link to the report as an .XLSX file. + ## Dataset -https://archive.ics.uci.edu/dataset/352/online+retail +The dataset we are using in this example is the +[Online Retail](https://archive.ics.uci.edu/dataset/352/online+retail) dataset. It contains +information about invoices for products that were purchased by customers, which also includes +product quantity, the invoice date and country that the user resides in. For a more +detailed data structure, please check the [OnlineRetail.ddl](data/Online%20Retail.ddl) file. + +## Importing the Data + +To import the data, pleasde download the dataset file from the link below and save it to your +file system. -To import it into weaviate: +https://archive.ics.uci.edu/static/public/352/online+retail.zip + +You can import the dataset to the database by running the script below: ```bash -uv run --env-file .env python -m aieng.agent_evals.report_generation.data_import +uv run --env-file .env python -m aieng.agent_evals.report_generation.data.import_online_retail_data --dataset-path ``` +Replace `` with the path the dataset's .CSV file is saved in your machine. + +***NOTE:*** You can configure the location the database is saved by setting the path to +an environment variable named `REPORT_GENERATION_DB_PATH`. + ## Running +To run the agent, please execute: + ```bash uv run --env-file .env python -m aieng.agent_evals.report_generation.main ``` + +The agent will be available in a [Gradio](https://www.gradio.app/) web UI under the +local address http://127.0.0.1:7860 which can be accessed on your preferred browser. diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.ddl b/aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.ddl new file mode 100644 index 0000000..fab35a3 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.ddl @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS "sales" ( + "InvoiceNo" INTEGER, + "StockCode" TEXT, + "Description" TEXT, + "Quantity" INTEGER, + "InvoiceDate" TEXT, + "UnitPrice" REAL, + "CustomerID" INTEGER, + "Country" TEXT +); diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/data/convert_dates.py b/aieng-eval-agents/aieng/agent_evals/report_generation/data/import_online_retail_data.py similarity index 52% rename from aieng-eval-agents/aieng/agent_evals/report_generation/data/convert_dates.py rename to aieng-eval-agents/aieng/agent_evals/report_generation/data/import_online_retail_data.py index 24f0c91..69e7832 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/data/convert_dates.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/data/import_online_retail_data.py @@ -1,17 +1,53 @@ -"""Convert InvoiceDate format in OnlineRetail.db. - -Convert from 'MM/DD/YY HH:MM' to 'YYYY-MM-DD HH:MM' for better searching abilities. -""" +"""Import the Online Retail dataset to a SQLite database.""" import logging +import os import sqlite3 from datetime import datetime +from pathlib import Path + +import click +import pandas as pd +from dotenv import load_dotenv -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") logger = logging.getLogger(__name__) +load_dotenv() + +# Will use this as default if no path is provided in the +# REPORT_GENERATION_DB_PATH env var +DEFAULT_DB_PATH = Path("aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.db") + -DB_PATH = "aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.db" +@click.command() +@click.option("--dataset-path", required=True, help="OnlieRetail dataset CSV path.") +def main(dataset_path: str): + """Import the Online Retail dataset to the database. + + Args: + dataset_path: The path to the CSV file containing the dataset. + """ + db_path = os.getenv("REPORT_GENERATION_DB_PATH", DEFAULT_DB_PATH) + + assert Path(dataset_path).exists(), f"Dataset path {dataset_path} does not exist" + assert Path(db_path).parent.exists(), f"Database path {db_path} does not exist" + + conn = sqlite3.connect(db_path) + logger.info("Creating tables according to the OnlineRetail.ddl file") + + with open(Path("aieng-eval-agents/aieng/agent_evals/report_generation/data/OnlineRetail.ddl"), "r") as file: + conn.executescript(file.read()) + conn.commit() + + logger.info(f"Importing dataset from {dataset_path} to database at {db_path}") + + df = pd.read_csv(dataset_path) + df["InvoiceDate"] = df["InvoiceDate"].apply(convert_date) + df.to_sql("sales", conn, if_exists="append", index=False) + + conn.close() + logger.info(f"Dataset imported successfully to database at {db_path}") def convert_date(date_str: str) -> str | None: @@ -59,42 +95,5 @@ def convert_date(date_str: str) -> str | None: return None -def main(): - """Convert all InvoiceDate values in the database.""" - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() - - # Get all rows with InvoiceDate - cursor.execute("SELECT rowid, InvoiceDate FROM sales WHERE InvoiceDate IS NOT NULL AND InvoiceDate != ''") - rows = cursor.fetchall() - - logger.info(f"Found {len(rows)} rows with InvoiceDate to convert") - - updated_count = 0 - error_count = 0 - - for rowid, old_date in rows: - new_date = convert_date(old_date) - if new_date: - try: - cursor.execute("UPDATE sales SET InvoiceDate = ? WHERE rowid = ?", (new_date, rowid)) - updated_count += 1 - if updated_count % 100 == 0: - logger.info(f"Updated {updated_count} rows...") - except Exception as e: - logger.error(f"Error updating rowid {rowid}: {e}") - error_count += 1 - else: - logger.warning(f"Could not convert date for rowid {rowid}: {old_date}") - error_count += 1 - - conn.commit() - conn.close() - - logger.info("Conversion complete!") - logger.info(f" Updated: {updated_count} rows") - logger.info(f" Errors: {error_count} rows") - - if __name__ == "__main__": main() diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/main.py b/aieng-eval-agents/aieng/agent_evals/report_generation/main.py index 5000d8a..8f994b0 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/main.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/main.py @@ -6,8 +6,8 @@ import agents import gradio as gr -from aieng.agent_evals.report_generation.async_client_manager import REPORTS_OUTPUT_PATH, AsyncClientManager -from aieng.agent_evals.report_generation.utils import ( +from aieng.agent_evals.async_client_manager import AsyncClientManager, ReportFileWriter +from aieng.agent_evals.utils import ( get_or_create_session, oai_agent_stream_to_gradio_messages, ) @@ -15,6 +15,8 @@ from gradio.components.chatbot import ChatMessage +load_dotenv() + REACT_INSTRUCTIONS = """\ Perform the task using the SQLite database tool. \ EACH TIME before invoking the function, you must explain your reasons for doing so. \ @@ -22,7 +24,7 @@ For best performance, divide complex queries into simpler sub-queries. \ Do not make up information. \ When the report is done, use the report file writer tool to write it to a file. \ -At the end, provide a link to the report file to the user. +At the end, provide the report file as a downloadable hyperlink to the user. """ @@ -103,6 +105,9 @@ async def _main( ) try: - demo.launch(share=False, allowed_paths=[REPORTS_OUTPUT_PATH.absolute()]) + demo.launch( + share=False, + allowed_paths=[ReportFileWriter.get_reports_output_path().absolute()], + ) finally: asyncio.run(AsyncClientManager.get_instance().close()) diff --git a/aieng-eval-agents/aieng/agent_evals/utils.py b/aieng-eval-agents/aieng/agent_evals/utils.py new file mode 100644 index 0000000..7cecc7d --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/utils.py @@ -0,0 +1,96 @@ +"""Utility functions for the report generation agent.""" + +import uuid +from typing import Any + +from agents import SQLiteSession, StreamEvent, stream_events +from agents.items import ToolCallOutputItem +from gradio.components.chatbot import ChatMessage, MetadataDict +from openai.types.responses import ResponseFunctionToolCall, ResponseOutputText +from openai.types.responses.response_completed_event import ResponseCompletedEvent +from openai.types.responses.response_output_message import ResponseOutputMessage + + +def oai_agent_stream_to_gradio_messages(stream_event: StreamEvent) -> list[ChatMessage]: + """Parse agent sdk "stream event" into a list of gr messages. + + Adds extra data for tool use to make the gradio display informative. + """ + output: list[ChatMessage] = [] + + if isinstance(stream_event, stream_events.RawResponsesStreamEvent): + data = stream_event.data + if isinstance(data, ResponseCompletedEvent): + # The completed event may contain multiple output messages, + # including tool calls and final outputs. + # If there is at least one tool call, we mark the response as a thought. + is_thought = len(data.response.output) > 1 and any( + isinstance(message, ResponseFunctionToolCall) for message in data.response.output + ) + + for message in data.response.output: + if isinstance(message, ResponseOutputMessage): + for _item in message.content: + if isinstance(_item, ResponseOutputText): + output.append( + ChatMessage( + role="assistant", + content=_item.text, + metadata={ + "title": "🧠 Thought", + "id": data.sequence_number, + } + if is_thought + else MetadataDict(), + ) + ) + elif isinstance(message, ResponseFunctionToolCall): + output.append( + ChatMessage( + role="assistant", + content=f"```\n{message.arguments}\n```", + metadata={ + "title": f"🛠️ Used tool `{message.name}`", + }, + ) + ) + + elif isinstance(stream_event, stream_events.RunItemStreamEvent): + name = stream_event.name + item = stream_event.item + + if name == "tool_output" and isinstance(item, ToolCallOutputItem): + output.append( + ChatMessage( + role="assistant", + content=f"```\n{item.output}\n```", + metadata={ + "title": "*Tool call output*", + "status": "done", # This makes it collapsed by default + }, + ) + ) + + return output + + +def get_or_create_session( + history: list[ChatMessage], + session_state: dict[str, Any], +) -> SQLiteSession: + """Get existing session or create a new one for conversation persistence. + + Args: + history: The history of the conversation. + session_state: The state of the session. + + Returns + ------- + The session. + """ + if len(history) == 0: + session = SQLiteSession(session_id=str(uuid.uuid4())) + session_state["session"] = session + else: + session = session_state["session"] + return session From 6e3c4c239a07f5a2f2b77efb440f840b9cd892ae Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Fri, 23 Jan 2026 12:35:08 -0500 Subject: [PATCH 08/10] One more readme paragraph --- .../aieng/agent_evals/report_generation/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/README.md b/aieng-eval-agents/aieng/agent_evals/report_generation/README.md index 39dea3e..78b544f 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/README.md +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/README.md @@ -46,3 +46,6 @@ uv run --env-file .env python -m aieng.agent_evals.report_generation.main The agent will be available in a [Gradio](https://www.gradio.app/) web UI under the local address http://127.0.0.1:7860 which can be accessed on your preferred browser. + +On the UI, there will be a few examples of requests you can make for this agent. It also +features a text input so you can make your own report requests to it. From 530360e5a60bf7dc8a8e125daf99e2c2b5f1628a Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Fri, 23 Jan 2026 12:42:50 -0500 Subject: [PATCH 09/10] Adding a couple more vulnerabilities to the skip list --- .github/workflows/code_checks.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/code_checks.yml b/.github/workflows/code_checks.yml index 776b4e8..887f59e 100644 --- a/.github/workflows/code_checks.yml +++ b/.github/workflows/code_checks.yml @@ -54,5 +54,10 @@ jobs: uses: pypa/gh-action-pip-audit@1220774d901786e6f652ae159f7b6bc8fea6d266 with: virtual-environment: .venv/ + # Skipping one nbconvert vulnerability that has no fix version + # Skipping one orjson vulnerability that has no fix version + # Skipping one protobuf vulnerability that has no fix version ignore-vulns: | GHSA-xm59-rqc7-hhvf + GHSA-hx9q-6w63-j58v + GHSA-7gcm-g887-7qv7 From dc02ff2654d5ac36424a40713a8484fdc315e69f Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Fri, 23 Jan 2026 12:56:56 -0500 Subject: [PATCH 10/10] Grammar fixes --- .../aieng/agent_evals/report_generation/README.md | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/README.md b/aieng-eval-agents/aieng/agent_evals/report_generation/README.md index 78b544f..7af8aa0 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/README.md +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/README.md @@ -7,12 +7,13 @@ The data source implemented here is [SQLite](https://sqlite.org/) which is suppo natively by Python and saves the data in disk. The Report Generation Agent will provide an UI to read user queries in natural language -and procceeds to make SQL queries to the database. At the end, the Agent will provide -a downloadable link to the report as an .XLSX file. +and procceed to make SQL queries to the database in order to produce the data for +the report. At the end, the Agent will provide a downloadable link to the report as +an `.xlsx` file. ## Dataset -The dataset we are using in this example is the +The dataset used in this example is the [Online Retail](https://archive.ics.uci.edu/dataset/352/online+retail) dataset. It contains information about invoices for products that were purchased by customers, which also includes product quantity, the invoice date and country that the user resides in. For a more @@ -44,8 +45,8 @@ To run the agent, please execute: uv run --env-file .env python -m aieng.agent_evals.report_generation.main ``` -The agent will be available in a [Gradio](https://www.gradio.app/) web UI under the -local address http://127.0.0.1:7860 which can be accessed on your preferred browser. +The agent will be available through a [Gradio](https://www.gradio.app/) web UI under the +local address http://127.0.0.1:7860, which can be accessed on your preferred browser. -On the UI, there will be a few examples of requests you can make for this agent. It also +On the UI, there will be a few examples of requests you can make to this agent. It also features a text input so you can make your own report requests to it.