diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 86740a622..0e2d15590 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -384,6 +384,11 @@ class InstrumentorConfig(TypedDict): "min_version": "0.1.0", "package_name": "google-adk", # Actual pip package name }, + "agno": { + "module_name": "agentops.instrumentation.agno", + "class_name": "AgnoInstrumentor", + "min_version": "0.1.0", + }, } # Combine all target packages for monitoring diff --git a/agentops/instrumentation/agno/__init__.py b/agentops/instrumentation/agno/__init__.py new file mode 100644 index 000000000..c6c04a7fc --- /dev/null +++ b/agentops/instrumentation/agno/__init__.py @@ -0,0 +1,18 @@ +"""Agno Agent instrumentation package.""" + +import logging + +from .instrumentor import AgnoInstrumentor + +logger = logging.getLogger(__name__) + +__version__ = "1.0.0" + +LIBRARY_NAME = "agno" +LIBRARY_VERSION = __version__ + +__all__ = [ + "AgnoInstrumentor", + "LIBRARY_NAME", + "LIBRARY_VERSION", +] diff --git a/agentops/instrumentation/agno/attributes/__init__.py b/agentops/instrumentation/agno/attributes/__init__.py new file mode 100644 index 000000000..377f465a7 --- /dev/null +++ b/agentops/instrumentation/agno/attributes/__init__.py @@ -0,0 +1,14 @@ +"""Agno Agent attributes package for span instrumentation.""" + +from .agent import get_agent_run_attributes +from .team import get_team_run_attributes +from .tool import get_tool_execution_attributes +from .workflow import get_workflow_run_attributes, get_workflow_session_attributes + +__all__ = [ + "get_agent_run_attributes", + "get_team_run_attributes", + "get_tool_execution_attributes", + "get_workflow_run_attributes", + "get_workflow_session_attributes", +] diff --git a/agentops/instrumentation/agno/attributes/agent.py b/agentops/instrumentation/agno/attributes/agent.py new file mode 100644 index 000000000..5d0f3181c --- /dev/null +++ b/agentops/instrumentation/agno/attributes/agent.py @@ -0,0 +1,290 @@ +"""Agno Agent run attributes handler.""" + +from typing import Optional, Tuple, Dict, Any + +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes, WorkflowAttributes, AgentAttributes, ToolAttributes +from agentops.semconv.span_kinds import SpanKind as AgentOpsSpanKind + + +def get_agent_run_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract span attributes for Agent.run/arun calls. + + Args: + args: Positional arguments passed to the run method (self, message, ...) + kwargs: Keyword arguments passed to the run method + return_value: The return value from the run method (RunResponse) + + Returns: + A dictionary of span attributes to be set on the agent span + """ + attributes: AttributeMap = {} + + # Initialize variables to avoid UnboundLocalError + agent_name = None + + # Base attributes + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = AgentOpsSpanKind.WORKFLOW + attributes[SpanAttributes.LLM_SYSTEM] = "agno" + attributes[SpanAttributes.LLM_REQUEST_STREAMING] = "False" + + # AgentOps entity attributes (matching CrewAI pattern) + attributes[SpanAttributes.AGENTOPS_ENTITY_NAME] = "Agent" + + # Extract agent information from args[0] (self) + if args and len(args) >= 1: + agent = args[0] + + # Core agent identification using AgentAttributes + if hasattr(agent, "agent_id") and agent.agent_id: + agent_id = str(agent.agent_id) + attributes[AgentAttributes.AGENT_ID] = agent_id + attributes["agno.agent.id"] = agent_id + + if hasattr(agent, "name") and agent.name: + agent_name = str(agent.name) + attributes[AgentAttributes.AGENT_NAME] = agent_name + attributes["agno.agent.name"] = agent_name + + if hasattr(agent, "role") and agent.role: + agent_role = str(agent.role) + attributes[AgentAttributes.AGENT_ROLE] = agent_role + attributes["agno.agent.role"] = agent_role + + # Check if agent is part of a team + if hasattr(agent, "_team") and agent._team: + team = agent._team + if hasattr(team, "name") and team.name: + attributes["agno.agent.parent_team"] = str(team.name) + attributes["agno.agent.parent_team_display"] = f"Under {team.name}" + if hasattr(team, "team_id") and team.team_id: + attributes["agno.agent.parent_team_id"] = str(team.team_id) + + # Model information using AgentAttributes + if hasattr(agent, "model") and agent.model: + model = agent.model + if hasattr(model, "id"): + model_id = str(model.id) + attributes[AgentAttributes.AGENT_MODELS] = model_id + attributes["agno.agent.model_id"] = model_id + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = model_id + + if hasattr(model, "provider"): + model_provider = str(model.provider) + attributes["agno.agent.model_provider"] = model_provider + attributes[SpanAttributes.LLM_REQUEST_MODEL] = model_id if hasattr(model, "id") else "unknown" + + # Agent configuration details + agent_config = {} + + if hasattr(agent, "description") and agent.description: + agent_config["description"] = str(agent.description)[:500] # Limit length + + if hasattr(agent, "goal") and agent.goal: + agent_config["goal"] = str(agent.goal)[:500] # Limit length + + if hasattr(agent, "instructions") and agent.instructions: + if isinstance(agent.instructions, list): + agent_config["instructions"] = " | ".join(str(i) for i in agent.instructions[:3]) # First 3 + else: + agent_config["instructions"] = str(agent.instructions)[:500] + + if hasattr(agent, "expected_output") and agent.expected_output: + agent_config["expected_output"] = str(agent.expected_output)[:300] + + if hasattr(agent, "markdown"): + agent_config["markdown"] = str(agent.markdown) + + if hasattr(agent, "reasoning"): + agent_config["reasoning"] = str(agent.reasoning) + + if hasattr(agent, "stream"): + agent_config["stream"] = str(agent.stream) + + if hasattr(agent, "retries"): + agent_config["max_retry_limit"] = str(agent.retries) + + if hasattr(agent, "response_model") and agent.response_model: + agent_config[SpanAttributes.LLM_RESPONSE_MODEL] = str(agent.response_model.__name__) + + if hasattr(agent, "show_tool_calls"): + agent_config["show_tool_calls"] = str(agent.show_tool_calls) + + if hasattr(agent, "tool_call_limit") and agent.tool_call_limit: + agent_config["tool_call_limit"] = str(agent.tool_call_limit) + + # Add agent config to attributes + for key, value in agent_config.items(): + attributes[f"agno.agent.{key}"] = value + + # Tools information + if hasattr(agent, "tools") and agent.tools: + tools_info = [] + tool_names = [] + + for tool in agent.tools: + tool_dict = {} + + if hasattr(tool, "name"): + tool_name = str(tool.name) + tool_dict["name"] = tool_name + tool_names.append(tool_name) + elif hasattr(tool, "__name__"): + tool_name = str(tool.__name__) + tool_dict["name"] = tool_name + tool_names.append(tool_name) + elif callable(tool): + tool_name = getattr(tool, "__name__", "unknown_tool") + tool_dict["name"] = tool_name + tool_names.append(tool_name) + + if hasattr(tool, "description"): + description = str(tool.description) + if len(description) > 200: + description = description[:197] + "..." + tool_dict["description"] = description + + if tool_dict: # Only add if we have some info + tools_info.append(tool_dict) + + # Set tool attributes + if tool_names: + attributes["agno.agent.tools_count"] = str(len(tool_names)) + + if tools_info: + # Instead of storing as JSON blob, set individual tool attributes + for i, tool in enumerate(tools_info): + prefix = f"agent.tool.{i}" + if "name" in tool: + attributes[f"{prefix}.{ToolAttributes.TOOL_NAME}"] = tool["name"] + if "description" in tool: + attributes[f"{prefix}.{ToolAttributes.TOOL_DESCRIPTION}"] = tool["description"] + + # Memory and knowledge information + if hasattr(agent, "memory") and agent.memory: + memory_type = type(agent.memory).__name__ + attributes["agno.agent.memory_type"] = memory_type + + if hasattr(agent, "knowledge") and agent.knowledge: + knowledge_type = type(agent.knowledge).__name__ + attributes["agno.agent.knowledge_type"] = knowledge_type + + if hasattr(agent, "storage") and agent.storage: + storage_type = type(agent.storage).__name__ + attributes["agno.agent.storage_type"] = storage_type + + # Session information + if hasattr(agent, "session_id") and agent.session_id: + session_id = str(agent.session_id) + attributes["agno.agent.session_id"] = session_id + + if hasattr(agent, "user_id") and agent.user_id: + user_id = str(agent.user_id) + attributes["agno.agent.user_id"] = user_id + + # Extract run input information + if args and len(args) >= 2: + message = args[1] # The message argument + if message: + message_str = str(message) + if len(message_str) > 500: + message_str = message_str[:497] + "..." + attributes[WorkflowAttributes.WORKFLOW_INPUT] = message_str + attributes["agno.agent.input"] = message_str + # AgentOps entity input (matching CrewAI pattern) + attributes[SpanAttributes.AGENTOPS_ENTITY_INPUT] = message_str + + # Extract kwargs information + if kwargs: + if kwargs.get("stream") is not None: + attributes[SpanAttributes.LLM_REQUEST_STREAMING] = str(kwargs["stream"]) + + if kwargs.get("session_id"): + attributes["agno.agent.run_session_id"] = str(kwargs["session_id"]) + + if kwargs.get("user_id"): + attributes["agno.agent.run_user_id"] = str(kwargs["user_id"]) + + # Extract return value information + if return_value: + if hasattr(return_value, "run_id") and return_value.run_id: + run_id = str(return_value.run_id) + attributes["agno.agent.run_id"] = run_id + + if hasattr(return_value, "session_id") and return_value.session_id: + session_id = str(return_value.session_id) + attributes["agno.agent.response_session_id"] = session_id + + if hasattr(return_value, "agent_id") and return_value.agent_id: + agent_id = str(return_value.agent_id) + attributes["agno.agent.response_agent_id"] = agent_id + + if hasattr(return_value, "content") and return_value.content: + content = str(return_value.content) + if len(content) > 500: + content = content[:497] + "..." + attributes[WorkflowAttributes.WORKFLOW_OUTPUT] = content + attributes["agno.agent.output"] = content + + if hasattr(return_value, "event") and return_value.event: + event = str(return_value.event) + attributes["agno.agent.event"] = event + + # Tool executions from the response + if hasattr(return_value, "tools") and return_value.tools: + tool_executions = [] + for tool_exec in return_value.tools: + tool_exec_dict = {} + + if hasattr(tool_exec, "tool_name") and tool_exec.tool_name: + tool_exec_dict["name"] = str(tool_exec.tool_name) + + if hasattr(tool_exec, "tool_args") and tool_exec.tool_args: + try: + import json + + args_str = json.dumps(tool_exec.tool_args) + if len(args_str) > 200: + args_str = args_str[:197] + "..." + tool_exec_dict["parameters"] = args_str + except: + tool_exec_dict["parameters"] = str(tool_exec.tool_args) + + if hasattr(tool_exec, "result") and tool_exec.result: + result_str = str(tool_exec.result) + if len(result_str) > 200: + result_str = result_str[:197] + "..." + tool_exec_dict["result"] = result_str + + if hasattr(tool_exec, "tool_call_error") and tool_exec.tool_call_error: + tool_exec_dict["error"] = str(tool_exec.tool_call_error) + + tool_exec_dict["status"] = "success" # Default to success + + if tool_exec_dict: + tool_executions.append(tool_exec_dict) + + if tool_executions: + # Add tool executions (limit to first 3) + limited_executions = tool_executions[:3] + for i, tool_exec in enumerate(limited_executions): + for key, value in tool_exec.items(): + attributes[f"agno.agent.tool_execution.{i}.{key}"] = value + + # Workflow type + attributes[WorkflowAttributes.WORKFLOW_TYPE] = "agent_run" + + # Add display name for better UI visualization + if agent_name: + # Check if we have parent team info + parent_team = attributes.get("agno.agent.parent_team") + if parent_team: + attributes["agno.agent.display_name"] = f"{agent_name} (Agent under {parent_team})" + else: + attributes["agno.agent.display_name"] = f"{agent_name} (Agent)" + + return attributes diff --git a/agentops/instrumentation/agno/attributes/metrics.py b/agentops/instrumentation/agno/attributes/metrics.py new file mode 100644 index 000000000..b8d3a9ac1 --- /dev/null +++ b/agentops/instrumentation/agno/attributes/metrics.py @@ -0,0 +1,231 @@ +"""Agno Agent session metrics attributes handler.""" + +from typing import Optional, Tuple, Dict, Any + +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes + + +def get_metrics_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract span attributes for Agent._set_session_metrics calls. + + Args: + args: Positional arguments passed to the _set_session_metrics method (self, run_messages) + kwargs: Keyword arguments passed to the _set_session_metrics method + return_value: The return value from the _set_session_metrics method + + Returns: + A dictionary of span attributes to be set on the metrics span + """ + attributes: AttributeMap = {} + + # Base attributes + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = "llm" + attributes[SpanAttributes.LLM_SYSTEM] = "agno" + attributes[SpanAttributes.AGENTOPS_ENTITY_NAME] = "LLM" + + # Initialize usage tracking variables (but don't set attributes yet) + usage_data = {} + + # Initialize counters for indexed messages + prompt_count = 0 + completion_count = 0 + + # Extract agent and run_messages from args (self, run_messages) + if args and len(args) >= 2: + agent = args[0] # self (Agent instance) + run_messages = args[1] # RunMessages object + + # Add agent display name for LLM calls + if hasattr(agent, "name") and agent.name: + attributes["agno.llm.display_name"] = f"{agent.name} → LLM" + + # Model information - get additional request parameters if available + if hasattr(agent, "model") and agent.model: + model = agent.model + # Set model ID first + if hasattr(model, "id"): + attributes[SpanAttributes.LLM_REQUEST_MODEL] = str(model.id) + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = str(model.id) + # Additional model parameters + if hasattr(model, "temperature") and model.temperature is not None: + attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = str(model.temperature) + if hasattr(model, "max_tokens") and model.max_tokens is not None: + attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = str(model.max_tokens) + if hasattr(model, "top_p") and model.top_p is not None: + attributes[SpanAttributes.LLM_REQUEST_TOP_P] = str(model.top_p) + if hasattr(model, "provider"): + attributes["agno.model.provider"] = str(model.provider) + + # Add model class name for better identification (with null check) + if hasattr(model, "__class__") and hasattr(model.__class__, "__name__"): + model_class = model.__class__.__name__ + attributes["agno.model.class"] = model_class + + # === EXTRACT CONVERSATION STRUCTURE === + if hasattr(run_messages, "messages") and run_messages.messages: + messages = run_messages.messages + + # Initialize token tracking + total_prompt_tokens = 0 + total_completion_tokens = 0 + total_output_tokens = 0 + total_input_tokens = 0 + total_tokens = 0 + total_time = 0.0 + + # Process messages to create individual indexed gen_ai.prompt.{i} and gen_ai.completion.{i} attributes + for i, msg in enumerate(messages): + # Extract message content for prompts/completions + if hasattr(msg, "role") and hasattr(msg, "content"): + # Only set content if it's not None/empty + if msg.content is not None and str(msg.content).strip() != "" and str(msg.content) != "None": + content = str(msg.content) + # Truncate very long content to avoid oversized attributes + if len(content) > 1000: + content = content[:997] + "..." + + if msg.role == "user": + attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.role"] = "user" + attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.content"] = content + prompt_count += 1 + elif msg.role == "assistant": + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_count}.role"] = "assistant" + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_count}.content"] = content + completion_count += 1 + elif msg.role == "system": + attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.role"] = "system" + attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.content"] = content + prompt_count += 1 + else: + # For messages with None content, still set the role but skip content + if msg.role == "user": + attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.role"] = "user" + prompt_count += 1 + elif msg.role == "assistant": + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_count}.role"] = "assistant" + completion_count += 1 + elif msg.role == "system": + attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.role"] = "system" + prompt_count += 1 + + # Extract token metrics from message + if hasattr(msg, "metrics") and msg.metrics: + metrics = msg.metrics + + # Handle different token metric patterns + if hasattr(metrics, "prompt_tokens") and metrics.prompt_tokens > 0: + total_prompt_tokens += metrics.prompt_tokens + if hasattr(metrics, "completion_tokens") and metrics.completion_tokens > 0: + total_completion_tokens += metrics.completion_tokens + if hasattr(metrics, "total_tokens") and metrics.total_tokens > 0: + total_tokens += metrics.total_tokens + # For messages that only have output_tokens (like Anthropic) + if hasattr(metrics, "output_tokens") and metrics.output_tokens > 0: + total_output_tokens += metrics.output_tokens + if hasattr(metrics, "input_tokens") and metrics.input_tokens > 0: + total_input_tokens += metrics.input_tokens + if hasattr(metrics, "time") and metrics.time: + total_time += metrics.time + + # === TOKEN METRICS FROM AGENT SESSION METRICS === + if hasattr(agent, "session_metrics") and agent.session_metrics: + session_metrics = agent.session_metrics + + # Try to get model name from session metrics if not already set + if SpanAttributes.LLM_REQUEST_MODEL not in attributes: + if hasattr(session_metrics, "model") and session_metrics.model: + model_id = str(session_metrics.model) + attributes[SpanAttributes.LLM_REQUEST_MODEL] = model_id + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = model_id + + # Only set token variables if the attributes actually exist + session_prompt_tokens = None + session_completion_tokens = None + session_output_tokens = None + session_input_tokens = None + session_total_tokens = None + + if hasattr(session_metrics, "prompt_tokens"): + session_prompt_tokens = session_metrics.prompt_tokens + + if hasattr(session_metrics, "completion_tokens"): + session_completion_tokens = session_metrics.completion_tokens + + if hasattr(session_metrics, "output_tokens"): + session_output_tokens = session_metrics.output_tokens + + if hasattr(session_metrics, "input_tokens"): + session_input_tokens = session_metrics.input_tokens + + if hasattr(session_metrics, "total_tokens"): + session_total_tokens = session_metrics.total_tokens + + # For Anthropic, output_tokens represents completion tokens + if session_output_tokens is not None and session_output_tokens > 0: + if session_completion_tokens is None or session_completion_tokens == 0: + session_completion_tokens = session_output_tokens + + # For some providers, input_tokens represents prompt tokens + if session_input_tokens is not None and session_input_tokens > 0: + if session_prompt_tokens is None or session_prompt_tokens == 0: + session_prompt_tokens = session_input_tokens + + # Only set token attributes if we have actual values + if session_total_tokens is not None and session_total_tokens > 0: + usage_data["total_tokens"] = session_total_tokens + + # Set breakdown if available + if session_prompt_tokens is not None and session_prompt_tokens > 0: + usage_data["prompt_tokens"] = session_prompt_tokens + if session_completion_tokens is not None and session_completion_tokens > 0: + usage_data["completion_tokens"] = session_completion_tokens + + # Additional token types from session metrics - only set if present + if hasattr(session_metrics, "cached_tokens") and session_metrics.cached_tokens > 0: + usage_data["cache_read_input_tokens"] = session_metrics.cached_tokens + if hasattr(session_metrics, "reasoning_tokens") and session_metrics.reasoning_tokens > 0: + usage_data["reasoning_tokens"] = session_metrics.reasoning_tokens + + # === FALLBACK TO MESSAGE AGGREGATION IF SESSION METRICS ARE EMPTY === + # If we don't have token data from session metrics, try message aggregation + if "total_tokens" not in usage_data: + # Set aggregated token usage from messages + if total_prompt_tokens > 0 or total_input_tokens > 0: + usage_data["prompt_tokens"] = total_prompt_tokens or total_input_tokens + if total_completion_tokens > 0 or total_output_tokens > 0: + usage_data["completion_tokens"] = total_completion_tokens or total_output_tokens + if total_tokens > 0: + usage_data["total_tokens"] = total_tokens + + # Extract user message info if available + if hasattr(run_messages, "user_message") and run_messages.user_message: + user_msg = run_messages.user_message + if hasattr(user_msg, "content"): + content = str(user_msg.content) + if len(content) > 1000: + content = content[:997] + "..." + attributes["agno.metrics.user_input"] = content + + # Set individual LLM usage attributes only for values we actually have + if "prompt_tokens" in usage_data: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = usage_data["prompt_tokens"] + if "completion_tokens" in usage_data: + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = usage_data["completion_tokens"] + if "total_tokens" in usage_data: + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = usage_data["total_tokens"] + if "cache_read_input_tokens" in usage_data: + attributes[SpanAttributes.LLM_USAGE_CACHE_READ_INPUT_TOKENS] = usage_data["cache_read_input_tokens"] + if "reasoning_tokens" in usage_data: + attributes[SpanAttributes.LLM_USAGE_REASONING_TOKENS] = usage_data["reasoning_tokens"] + + # But only if we have any usage data + if usage_data: + for key, value in usage_data.items(): + attributes[f"gen_ai.usage.{key}"] = value + + return attributes diff --git a/agentops/instrumentation/agno/attributes/team.py b/agentops/instrumentation/agno/attributes/team.py new file mode 100644 index 000000000..b3cc68081 --- /dev/null +++ b/agentops/instrumentation/agno/attributes/team.py @@ -0,0 +1,325 @@ +"""Agno Team run attributes handler.""" + +from typing import Optional, Tuple, Dict, Any + +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes, WorkflowAttributes +from agentops.semconv.span_kinds import SpanKind as AgentOpsSpanKind + + +def get_team_run_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract span attributes for Team._run method calls. + + Args: + args: Positional arguments passed to the Team._run method + kwargs: Keyword arguments passed to the Team._run method + return_value: The return value from the Team._run method + + Returns: + A dictionary of span attributes to be set on the workflow span + """ + attributes: AttributeMap = {} + + # Base attributes + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = AgentOpsSpanKind.WORKFLOW + attributes[SpanAttributes.LLM_SYSTEM] = "agno" + attributes[WorkflowAttributes.WORKFLOW_TYPE] = "team_run" + + # Extract team information from instance + if args and len(args) > 0: + team = args[0] # self (Team instance) + + # Team identification + if hasattr(team, "name") and team.name: + attributes["agno.team.name"] = str(team.name) + attributes["agno.team.display_name"] = f"{team.name} (Team)" + + if hasattr(team, "team_id") and team.team_id: + attributes["agno.team.team_id"] = str(team.team_id) + + if hasattr(team, "mode") and team.mode: + attributes["agno.team.mode"] = str(team.mode) + + if hasattr(team, "members") and team.members: + attributes["agno.team.members_count"] = str(len(team.members)) + + # Add detailed member information + member_agents = [] + for i, member in enumerate(team.members): + member_info = {} + if hasattr(member, "name") and member.name: + member_info["name"] = str(member.name) + if hasattr(member, "agent_id") and member.agent_id: + member_info["id"] = str(member.agent_id) + if hasattr(member, "role") and member.role: + member_info["role"] = str(member.role) + if hasattr(member, "model") and member.model: + if hasattr(member.model, "id"): + member_info["model"] = str(member.model.id) + + # Add member info to list + if member_info: + member_agents.append(member_info) + + # Also add individual member attributes + for key, value in member_info.items(): + attributes[f"agno.team.member.{i}.{key}"] = value + + # Add aggregated member list + if member_agents: + import json + + try: + attributes["agno.team.members"] = json.dumps(member_agents) + # Also add a simple list of member names + member_names = [m.get("name", "Unknown") for m in member_agents] + attributes["agno.team.member_names"] = ", ".join(member_names) + except: + attributes["agno.team.members"] = str(member_agents) + + # Process input arguments from the run_messages parameter + if args and len(args) >= 2: + # args[0] is run_response, args[1] is run_messages + run_messages = args[1] + if hasattr(run_messages, "messages") and run_messages.messages: + # Get the user message for workflow input + user_messages = [msg for msg in run_messages.messages if hasattr(msg, "role") and msg.role == "user"] + if user_messages: + last_user_msg = user_messages[-1] + if hasattr(last_user_msg, "content"): + attributes[WorkflowAttributes.WORKFLOW_INPUT] = str(last_user_msg.content) + attributes[WorkflowAttributes.WORKFLOW_INPUT_TYPE] = "message" + + # Count total messages + attributes["agno.team.messages_count"] = str(len(run_messages.messages)) + + # Process keyword arguments + if kwargs: + if kwargs.get("user_id"): + attributes[SpanAttributes.LLM_USER] = kwargs["user_id"] + + if kwargs.get("session_id"): + attributes["agno.team.session_id"] = kwargs["session_id"] + + if kwargs.get("response_format"): + attributes["agno.team.response_format"] = str(type(kwargs["response_format"]).__name__) + + # Process return value (TeamRunResponse) + if return_value: + if hasattr(return_value, "content"): + content = str(return_value.content) + # Truncate if too long + if len(content) > 1000: + content = content[:997] + "..." + attributes[WorkflowAttributes.WORKFLOW_OUTPUT] = content + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TYPE] = "team_run_response" + else: + output = str(return_value) + if len(output) > 1000: + output = output[:997] + "..." + attributes[WorkflowAttributes.WORKFLOW_OUTPUT] = output + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TYPE] = type(return_value).__name__ + + # Set additional team response attributes + if hasattr(return_value, "run_id"): + attributes["agno.team.run_id"] = str(return_value.run_id) + + if hasattr(return_value, "session_id"): + attributes["agno.team.response_session_id"] = str(return_value.session_id) + + if hasattr(return_value, "team_id"): + attributes["agno.team.response_team_id"] = str(return_value.team_id) + + if hasattr(return_value, "model"): + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = str(return_value.model) + + if hasattr(return_value, "model_provider"): + attributes["agno.team.model_provider"] = str(return_value.model_provider) + + if hasattr(return_value, "event"): + attributes["agno.team.event"] = str(return_value.event) + + # Team-specific attributes + if hasattr(return_value, "content_type"): + attributes["agno.team.response_content_type"] = str(return_value.content_type) + + return attributes + + +def get_team_public_run_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract span attributes for Team.run method calls (public API). + + Args: + args: Positional arguments passed to the Team.run method (self, message, ...) + kwargs: Keyword arguments passed to the Team.run method + return_value: The return value from the Team.run method + + Returns: + A dictionary of span attributes to be set on the workflow span + """ + attributes: AttributeMap = {} + + # Base attributes + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = AgentOpsSpanKind.WORKFLOW + attributes[SpanAttributes.LLM_SYSTEM] = "agno" + attributes[WorkflowAttributes.WORKFLOW_TYPE] = "team_run" + + # Extract team information from instance + if args and len(args) > 0: + team = args[0] # self (Team instance) + + # Team identification + if hasattr(team, "name") and team.name: + attributes["agno.team.name"] = str(team.name) + attributes["agno.team.display_name"] = f"{team.name} (Team)" + + if hasattr(team, "team_id") and team.team_id: + attributes["agno.team.team_id"] = str(team.team_id) + + if hasattr(team, "mode") and team.mode: + attributes["agno.team.mode"] = str(team.mode) + + if hasattr(team, "members") and team.members: + attributes["agno.team.members_count"] = str(len(team.members)) + + # Add detailed member information + member_agents = [] + for i, member in enumerate(team.members): + member_info = {} + if hasattr(member, "name") and member.name: + member_info["name"] = str(member.name) + if hasattr(member, "agent_id") and member.agent_id: + member_info["id"] = str(member.agent_id) + if hasattr(member, "role") and member.role: + member_info["role"] = str(member.role) + if hasattr(member, "model") and member.model: + if hasattr(member.model, "id"): + member_info["model"] = str(member.model.id) + + # Add member info to list + if member_info: + member_agents.append(member_info) + + # Also add individual member attributes + for key, value in member_info.items(): + attributes[f"agno.team.member.{i}.{key}"] = value + + # Add aggregated member list + if member_agents: + import json + + try: + attributes["agno.team.members"] = json.dumps(member_agents) + # Also add a simple list of member names + member_names = [m.get("name", "Unknown") for m in member_agents] + attributes["agno.team.member_names"] = ", ".join(member_names) + except: + attributes["agno.team.members"] = str(member_agents) + + # Process input arguments from Team.run() method + if args and len(args) >= 2: + # args[0] is self (Team instance), args[1] is message + message = args[1] + + # Extract workflow input from message + if message is not None: + if isinstance(message, str): + message_content = message + elif hasattr(message, "content"): + message_content = str(message.content) + elif hasattr(message, "get_content_string"): + message_content = message.get_content_string() + else: + message_content = str(message) + + # Truncate if too long + if len(message_content) > 1000: + message_content = message_content[:997] + "..." + attributes[WorkflowAttributes.WORKFLOW_INPUT] = message_content + attributes[WorkflowAttributes.WORKFLOW_INPUT_TYPE] = "message" + + # Process keyword arguments + if kwargs: + if kwargs.get("user_id"): + attributes[SpanAttributes.LLM_USER] = kwargs["user_id"] + + if kwargs.get("session_id"): + attributes["agno.team.session_id"] = kwargs["session_id"] + + if kwargs.get("stream"): + attributes["agno.team.streaming"] = str(kwargs["stream"]) + + if kwargs.get("stream_intermediate_steps"): + attributes["agno.team.stream_intermediate_steps"] = str(kwargs["stream_intermediate_steps"]) + + if kwargs.get("retries"): + attributes["agno.team.retries"] = str(kwargs["retries"]) + + # Media attachments + if kwargs.get("audio"): + attributes["agno.team.has_audio"] = "true" + if kwargs.get("images"): + attributes["agno.team.has_images"] = "true" + if kwargs.get("videos"): + attributes["agno.team.has_videos"] = "true" + if kwargs.get("files"): + attributes["agno.team.has_files"] = "true" + + if kwargs.get("knowledge_filters"): + attributes["agno.team.has_knowledge_filters"] = "true" + + # Process return value (TeamRunResponse or Iterator) + if return_value: + # Handle both single response and iterator + if hasattr(return_value, "__iter__") and not isinstance(return_value, str): + # It's an iterator for streaming + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TYPE] = "team_run_response_stream" + attributes["agno.team.is_streaming"] = "true" + elif hasattr(return_value, "content"): + # It's a TeamRunResponse + content = str(return_value.content) + # Truncate if too long + if len(content) > 1000: + content = content[:997] + "..." + attributes[WorkflowAttributes.WORKFLOW_OUTPUT] = content + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TYPE] = "team_run_response" + + # Set additional team response attributes + if hasattr(return_value, "run_id"): + attributes["agno.team.run_id"] = str(return_value.run_id) + + if hasattr(return_value, "session_id"): + attributes["agno.team.response_session_id"] = str(return_value.session_id) + + if hasattr(return_value, "team_id"): + attributes["agno.team.response_team_id"] = str(return_value.team_id) + + if hasattr(return_value, "model"): + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = str(return_value.model) + + if hasattr(return_value, "model_provider"): + attributes["agno.team.model_provider"] = str(return_value.model_provider) + + if hasattr(return_value, "event"): + attributes["agno.team.event"] = str(return_value.event) + + # Team-specific attributes + if hasattr(return_value, "content_type"): + attributes["agno.team.response_content_type"] = str(return_value.content_type) + else: + # Unknown return type + output = str(return_value) + if len(output) > 1000: + output = output[:997] + "..." + attributes[WorkflowAttributes.WORKFLOW_OUTPUT] = output + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TYPE] = type(return_value).__name__ + + return attributes diff --git a/agentops/instrumentation/agno/attributes/tool.py b/agentops/instrumentation/agno/attributes/tool.py new file mode 100644 index 000000000..92f9e1ad9 --- /dev/null +++ b/agentops/instrumentation/agno/attributes/tool.py @@ -0,0 +1,241 @@ +"""Agno tool execution attributes handler.""" + +import json +from typing import Optional, Tuple, Dict, Any + +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes +from agentops.semconv.span_kinds import SpanKind as AgentOpsSpanKind +from agentops.semconv.tool import ToolAttributes + + +def get_tool_execution_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract span attributes for tool execution calls (FunctionCall.execute/aexecute). + + Args: + args: Positional arguments passed to the execute method (self) + kwargs: Keyword arguments passed to the execute method + return_value: The return value from the execute method (FunctionExecutionResult) + + Returns: + A dictionary of span attributes to be set on the tool execution span + """ + attributes: AttributeMap = {} + + # Base attributes + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = AgentOpsSpanKind.TOOL + attributes[SpanAttributes.LLM_SYSTEM] = "agno" + attributes["agno.tool.operation"] = "execute" + + # Process the FunctionCall object (self in execute method) + if args and len(args) > 0: + function_call = args[0] + + # Add detailed function call information + attributes["agno.tool.function_call_type"] = str(type(function_call).__name__) + + # Extract tool information + if hasattr(function_call, "function") and function_call.function: + function = function_call.function + + # Get function name and add display name + if hasattr(function, "__name__"): + func_name = function.__name__ + attributes["agno.tool.function_name"] = func_name + attributes["agno.tool.display_name"] = f"{func_name} (Tool)" + + tool_name = getattr(function, "name", "unknown_tool") + + # Set span attributes for the tool execution span + attributes[ToolAttributes.TOOL_NAME] = tool_name + attributes["agno.tool.function_name"] = tool_name + + # Function details and context + if hasattr(function, "description"): + description = getattr(function, "description", "") + if description: + attributes[ToolAttributes.TOOL_DESCRIPTION] = description + attributes["agno.tool.function_description"] = description + + # Function source information + if hasattr(function, "entrypoint") and function.entrypoint: + entrypoint = function.entrypoint + if hasattr(entrypoint, "__module__"): + attributes["agno.tool.function_module"] = str(entrypoint.__module__) + if hasattr(entrypoint, "__name__"): + attributes["agno.tool.function_method"] = str(entrypoint.__name__) + if hasattr(entrypoint, "__qualname__"): + attributes["agno.tool.function_qualname"] = str(entrypoint.__qualname__) + + # Tool capabilities + if hasattr(function, "requires_confirmation"): + attributes["agno.tool.requires_confirmation"] = str(function.requires_confirmation) + if hasattr(function, "show_result"): + attributes["agno.tool.show_result"] = str(function.show_result) + if hasattr(function, "stop_after_tool_call"): + attributes["agno.tool.stop_after_tool_call"] = str(function.stop_after_tool_call) + + # Extract tool arguments with better formatting + if hasattr(function_call, "arguments") and function_call.arguments: + try: + if isinstance(function_call.arguments, str): + args_dict = json.loads(function_call.arguments) + else: + args_dict = function_call.arguments + + # Format arguments nicely + formatted_args = [] + for key, value in args_dict.items(): + value_str = str(value) + formatted_args.append(f"{key}={value_str}") + + attributes[ToolAttributes.TOOL_PARAMETERS] = json.dumps(args_dict) + attributes["agno.tool.formatted_args"] = ", ".join(formatted_args) + attributes["agno.tool.args_count"] = str(len(args_dict)) + except Exception as e: + attributes[ToolAttributes.TOOL_PARAMETERS] = str(function_call.arguments) + attributes["agno.tool.args_parse_error"] = str(e) + + # Extract call ID and metadata + if hasattr(function_call, "tool_call_id"): + attributes["agno.tool.call_id"] = str(function_call.tool_call_id) + + # Check for any agent context + if hasattr(function_call, "_agent") and function_call._agent: + agent = function_call._agent + if hasattr(agent, "name"): + attributes["agno.tool.calling_agent_name"] = str(agent.name) + if hasattr(agent, "agent_id"): + attributes["agno.tool.calling_agent_id"] = str(agent.agent_id) + + # Process return value + if return_value is not None: + # Add timing information + import time + + attributes["agno.tool.execution_timestamp"] = str(int(time.time() * 1000)) + + # Determine execution status and result information + if hasattr(return_value, "value"): + # FunctionExecutionResult with value + result_value = return_value.value + attributes["agno.tool.execution_status"] = "success" + else: + # Direct return value + result_value = return_value + attributes["agno.tool.execution_status"] = "success" + + # Process result value + if result_value is not None: + result_type = type(result_value).__name__ + attributes["agno.tool.execution_result_status"] = str(result_type) + + # Handle FunctionExecutionResult objects specifically + if hasattr(result_value, "status") and hasattr(result_value, "result"): + # This looks like a FunctionExecutionResult + status = getattr(result_value, "status", "unknown") + actual_result = getattr(result_value, "result", None) + error = getattr(result_value, "error", None) + + attributes["agno.tool.execution_result_status"] = str(status) + attributes[ToolAttributes.TOOL_STATUS] = str(status) + + if error: + attributes["agno.tool.execution_error"] = str(error) + attributes["tool.error"] = str(error) + + if actual_result is not None: + actual_result_type = type(actual_result).__name__ + attributes["agno.tool.actual_result_type"] = actual_result_type + + # Enhanced generator handling + if hasattr(actual_result, "__iter__") and hasattr(actual_result, "__next__"): + attributes["agno.tool.result_is_generator"] = "true" + + # Try to get more meaningful information about the generator + generator_info = [] + + # Get function name from the generator + if hasattr(actual_result, "gi_code"): + func_name = actual_result.gi_code.co_name + attributes["agno.tool.generator_function"] = func_name + generator_info.append(f"function={func_name}") + + # Get local variables from generator frame for context + if hasattr(actual_result, "gi_frame") and actual_result.gi_frame: + try: + locals_dict = actual_result.gi_frame.f_locals + # Look for interesting variables that give context + context_vars = [ + "task_description", + "expected_output", + "member_agent", + "agent_name", + "team", + "message", + ] + for var_name in context_vars: + if var_name in locals_dict: + value = str(locals_dict[var_name]) + generator_info.append(f"{var_name}={value}") + attributes[f"agno.tool.generator_{var_name}"] = value + + # Count total local variables for debugging + attributes["agno.tool.generator_locals_count"] = str(len(locals_dict)) + except Exception as e: + attributes["agno.tool.generator_locals_error"] = str(e) + + # Try to identify what type of transfer this is + generator_str = str(actual_result) + if "transfer_task_to_member" in generator_str: + attributes["agno.tool.transfer_type"] = "task_to_member" + elif "transfer" in generator_str.lower(): + attributes["agno.tool.transfer_type"] = "general_transfer" + + if generator_info: + result_str = f"Generator<{actual_result_type}>({', '.join(generator_info)})" + else: + result_str = f"Generator<{actual_result_type}> - {str(actual_result)}" + else: + # Regular result + result_str = str(actual_result) + else: + result_str = str(status) + else: + # Not a FunctionExecutionResult, handle as direct result + if hasattr(result_value, "__iter__") and hasattr(result_value, "__next__"): + # It's a generator + attributes["agno.tool.result_is_generator"] = "true" + + if hasattr(result_value, "gi_code"): + func_name = result_value.gi_code.co_name + attributes["agno.tool.generator_function"] = func_name + result_str = f"Generator<{result_type}> function={func_name} - {str(result_value)}" + else: + result_str = f"Generator<{result_type}> - {str(result_value)}" + else: + # Regular result + result_str = str(result_value) + else: + result_str = "None" + + # Set the main result attribute + attributes[ToolAttributes.TOOL_RESULT] = result_str + + # Add additional analysis attributes + attributes["agno.tool.result_length"] = str(len(result_str)) + + # Set final execution status + if not attributes.get(ToolAttributes.TOOL_STATUS): + attributes[ToolAttributes.TOOL_STATUS] = "success" + + # Add execution summary for debugging + tool_name = attributes.get(ToolAttributes.TOOL_NAME, "unknown") + call_type = attributes.get("agno.tool.transfer_type", "unknown") + attributes["agno.tool.execution_summary"] = f"Tool '{tool_name}' executed with type '{call_type}'" + + return attributes diff --git a/agentops/instrumentation/agno/attributes/workflow.py b/agentops/instrumentation/agno/attributes/workflow.py new file mode 100644 index 000000000..384cb616b --- /dev/null +++ b/agentops/instrumentation/agno/attributes/workflow.py @@ -0,0 +1,198 @@ +"""Workflow attribute extraction for agno workflow instrumentation.""" + +from typing import Any, Dict, Optional, Tuple +from opentelemetry.util.types import AttributeValue + +from agentops.semconv.instrumentation import InstrumentationAttributes +from agentops.semconv.span_kinds import SpanKind as AgentOpsSpanKind +from agentops.semconv.workflow import WorkflowAttributes +from agentops.instrumentation.common.attributes import get_common_attributes + + +def get_workflow_run_attributes( + args: Tuple[Any, ...] = (), + kwargs: Optional[Dict[str, Any]] = None, + return_value: Optional[Any] = None, +) -> Dict[str, AttributeValue]: + """Extract attributes from workflow run operations. + + Args: + args: Positional arguments passed to the workflow run method + kwargs: Keyword arguments passed to the workflow run method + return_value: Return value from the workflow run method + + Returns: + Dictionary of OpenTelemetry attributes for workflow runs + """ + attributes = get_common_attributes() + kwargs = kwargs or {} + + if args and len(args) > 0: + workflow = args[0] + + # Core workflow attributes + if hasattr(workflow, "name") and workflow.name: + attributes[WorkflowAttributes.WORKFLOW_NAME] = str(workflow.name) + if hasattr(workflow, "workflow_id") and workflow.workflow_id: + attributes[WorkflowAttributes.WORKFLOW_ID] = str(workflow.workflow_id) + if hasattr(workflow, "description") and workflow.description: + attributes[WorkflowAttributes.WORKFLOW_DESCRIPTION] = str(workflow.description) + if hasattr(workflow, "app_id") and workflow.app_id: + attributes[WorkflowAttributes.WORKFLOW_APP_ID] = str(workflow.app_id) + + # Set workflow type + attributes[WorkflowAttributes.WORKFLOW_TYPE] = "agno_workflow" + + # Session and user attributes + if hasattr(workflow, "session_id") and workflow.session_id: + attributes[WorkflowAttributes.WORKFLOW_SESSION_ID] = str(workflow.session_id) + if hasattr(workflow, "session_name") and workflow.session_name: + attributes[WorkflowAttributes.WORKFLOW_SESSION_NAME] = str(workflow.session_name) + if hasattr(workflow, "user_id") and workflow.user_id: + attributes[WorkflowAttributes.WORKFLOW_USER_ID] = str(workflow.user_id) + + # Run-specific attributes + if hasattr(workflow, "run_id") and workflow.run_id: + attributes[WorkflowAttributes.WORKFLOW_RUN_ID] = str(workflow.run_id) + + # Configuration attributes + if hasattr(workflow, "debug_mode"): + attributes[WorkflowAttributes.WORKFLOW_DEBUG_MODE] = bool(workflow.debug_mode) + if hasattr(workflow, "monitoring"): + attributes[WorkflowAttributes.WORKFLOW_MONITORING] = bool(workflow.monitoring) + if hasattr(workflow, "telemetry"): + attributes[WorkflowAttributes.WORKFLOW_TELEMETRY] = bool(workflow.telemetry) + + # Memory and storage attributes + if hasattr(workflow, "memory") and workflow.memory: + memory_type = type(workflow.memory).__name__ + attributes[WorkflowAttributes.WORKFLOW_MEMORY_TYPE] = memory_type + + if hasattr(workflow, "storage") and workflow.storage: + storage_type = type(workflow.storage).__name__ + attributes[WorkflowAttributes.WORKFLOW_STORAGE_TYPE] = storage_type + + # Input parameters from kwargs + if kwargs: + # Store workflow input + attributes[WorkflowAttributes.WORKFLOW_INPUT] = str(kwargs) + + # Count and types of input parameters + attributes[WorkflowAttributes.WORKFLOW_INPUT_PARAMETER_COUNT] = len(kwargs) + param_types = list(set(type(v).__name__ for v in kwargs.values())) + if param_types: + attributes[WorkflowAttributes.WORKFLOW_INPUT_TYPE] = str(param_types) + + # Store specific input keys (without values for privacy) + input_keys = list(kwargs.keys()) + if input_keys: + attributes[WorkflowAttributes.WORKFLOW_INPUT_PARAMETER_KEYS] = str(input_keys) + + # Workflow method parameters if available + if hasattr(workflow, "_run_parameters") and workflow._run_parameters: + param_count = len(workflow._run_parameters) + attributes[WorkflowAttributes.WORKFLOW_METHOD_PARAMETER_COUNT] = param_count + + if hasattr(workflow, "_run_return_type") and workflow._run_return_type: + attributes[WorkflowAttributes.WORKFLOW_METHOD_RETURN_TYPE] = str(workflow._run_return_type) + + # Process return value attributes + if return_value is not None: + return_type = type(return_value).__name__ + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TYPE] = return_type + + # Handle RunResponse objects + if hasattr(return_value, "content"): + # Store workflow output + if return_value.content: + attributes[WorkflowAttributes.WORKFLOW_OUTPUT] = str(return_value.content) + + if hasattr(return_value, "content_type"): + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_CONTENT_TYPE] = str(return_value.content_type) + if hasattr(return_value, "event"): + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_EVENT] = str(return_value.event) + if hasattr(return_value, "model"): + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_MODEL] = ( + str(return_value.model) if return_value.model else "" + ) + if hasattr(return_value, "model_provider"): + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_MODEL_PROVIDER] = ( + str(return_value.model_provider) if return_value.model_provider else "" + ) + + # Count various response components + if hasattr(return_value, "messages") and return_value.messages: + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_MESSAGE_COUNT] = len(return_value.messages) + if hasattr(return_value, "tools") and return_value.tools: + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TOOL_COUNT] = len(return_value.tools) + if hasattr(return_value, "images") and return_value.images: + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_IMAGE_COUNT] = len(return_value.images) + if hasattr(return_value, "videos") and return_value.videos: + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_VIDEO_COUNT] = len(return_value.videos) + if hasattr(return_value, "audio") and return_value.audio: + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_AUDIO_COUNT] = len(return_value.audio) + + # Handle generators/iterators + elif hasattr(return_value, "__iter__") and not isinstance(return_value, (str, bytes)): + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_IS_STREAMING] = True + + # Set span kind - AgentOpsSpanKind.WORKFLOW is already a string + attributes[InstrumentationAttributes.INSTRUMENTATION_TYPE] = AgentOpsSpanKind.WORKFLOW + + return attributes + + +def get_workflow_session_attributes( + args: Tuple[Any, ...] = (), + kwargs: Optional[Dict[str, Any]] = None, + return_value: Optional[Any] = None, +) -> Dict[str, AttributeValue]: + """Extract attributes from workflow session operations. + + Args: + args: Positional arguments passed to the session method + kwargs: Keyword arguments passed to the session method + return_value: Return value from the session method + + Returns: + Dictionary of OpenTelemetry attributes for workflow sessions + """ + attributes = get_common_attributes() + kwargs = kwargs or {} + + if args and len(args) > 0: + workflow = args[0] + + # Session attributes + if hasattr(workflow, "session_id") and workflow.session_id: + attributes[WorkflowAttributes.WORKFLOW_SESSION_ID] = str(workflow.session_id) + if hasattr(workflow, "session_name") and workflow.session_name: + attributes[WorkflowAttributes.WORKFLOW_SESSION_NAME] = str(workflow.session_name) + if hasattr(workflow, "workflow_id") and workflow.workflow_id: + attributes[WorkflowAttributes.WORKFLOW_SESSION_WORKFLOW_ID] = str(workflow.workflow_id) + if hasattr(workflow, "user_id") and workflow.user_id: + attributes[WorkflowAttributes.WORKFLOW_SESSION_USER_ID] = str(workflow.user_id) + + # Session state attributes + if hasattr(workflow, "session_state") and workflow.session_state: + if isinstance(workflow.session_state, dict): + attributes[WorkflowAttributes.WORKFLOW_SESSION_STATE_KEYS] = str(list(workflow.session_state.keys())) + attributes[WorkflowAttributes.WORKFLOW_SESSION_STATE_SIZE] = len(workflow.session_state) + + # Storage attributes + if hasattr(workflow, "storage") and workflow.storage: + storage_type = type(workflow.storage).__name__ + attributes[WorkflowAttributes.WORKFLOW_SESSION_STORAGE_TYPE] = storage_type + + # Process session return value if it's a WorkflowSession + if return_value is not None and hasattr(return_value, "session_id"): + attributes[WorkflowAttributes.WORKFLOW_SESSION_RETURNED_SESSION_ID] = str(return_value.session_id) + if hasattr(return_value, "created_at") and return_value.created_at: + attributes[WorkflowAttributes.WORKFLOW_SESSION_CREATED_AT] = int(return_value.created_at) + if hasattr(return_value, "updated_at") and return_value.updated_at: + attributes[WorkflowAttributes.WORKFLOW_SESSION_UPDATED_AT] = int(return_value.updated_at) + + # Set span kind - AgentOpsSpanKind.WORKFLOW is already a string + attributes[InstrumentationAttributes.INSTRUMENTATION_TYPE] = AgentOpsSpanKind.WORKFLOW + + return attributes diff --git a/agentops/instrumentation/agno/instrumentor.py b/agentops/instrumentation/agno/instrumentor.py new file mode 100644 index 000000000..2755dab9d --- /dev/null +++ b/agentops/instrumentation/agno/instrumentor.py @@ -0,0 +1,1078 @@ +"""Agno Agent Instrumentation for AgentOps + +This module provides instrumentation for the Agno Agent library, implementing OpenTelemetry +instrumentation for agent workflows and LLM model calls. + +We focus on instrumenting the following key endpoints: +- Agent.run/arun - Main agent workflow execution (sync/async) +- Team._run/_arun - Team workflow execution (sync/async) +- Team._run_stream/_arun_stream - Team streaming workflow execution (sync/async) +- FunctionCall.execute/aexecute - Tool execution when agents call tools (sync/async) +- Agent._run_tool/_arun_tool - Agent internal tool execution (sync/async) +- Agent._set_session_metrics - Session metrics capture for token usage and timing +- Workflow.run_workflow/arun_workflow - Workflow execution (sync/async) +- Workflow session management methods - Session lifecycle operations + +This provides clean visibility into agent workflows and actual tool usage with proper +parent-child span relationships. +""" + +from typing import List, Collection, Any, Optional +from opentelemetry.trace import get_tracer +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.metrics import get_meter +from opentelemetry import trace, context as otel_context +from opentelemetry.trace import Status, StatusCode +from wrapt import wrap_function_wrapper +import threading + +from agentops.logging import logger +from agentops.semconv import Meters +from agentops.instrumentation.common.wrappers import WrapConfig, wrap, unwrap + +# Import attribute handlers +from agentops.instrumentation.agno.attributes.agent import get_agent_run_attributes +from agentops.instrumentation.agno.attributes.team import get_team_run_attributes +from agentops.instrumentation.agno.attributes.tool import get_tool_execution_attributes +from agentops.instrumentation.agno.attributes.metrics import get_metrics_attributes +from agentops.instrumentation.agno.attributes.workflow import ( + get_workflow_run_attributes, + get_workflow_session_attributes, +) + +# Library info for tracer/meter +LIBRARY_NAME = "agentops.instrumentation.agno" +LIBRARY_VERSION = "0.1.0" + + +class StreamingContextManager: + """Manages span contexts for streaming agent and workflow executions.""" + + def __init__(self): + self._contexts = {} # context_id -> (span_context, span) + self._agent_sessions = {} # session_id -> agent_id mapping for context lookup + self._lock = threading.Lock() + + def store_context(self, context_id: str, span_context: Any, span: Any) -> None: + """Store span context for streaming execution.""" + with self._lock: + self._contexts[context_id] = (span_context, span) + + def get_context(self, context_id: str) -> Optional[tuple]: + """Retrieve stored span context.""" + with self._lock: + return self._contexts.get(context_id) + + def remove_context(self, context_id: str) -> None: + """Remove stored context (when streaming completes).""" + with self._lock: + self._contexts.pop(context_id, None) + + def store_agent_session_mapping(self, session_id: str, agent_id: str) -> None: + """Store mapping between session and agent for context lookup.""" + with self._lock: + self._agent_sessions[session_id] = agent_id + + def get_agent_context_by_session(self, session_id: str) -> Optional[tuple]: + """Get agent context using session ID.""" + with self._lock: + agent_id = self._agent_sessions.get(session_id) + if agent_id: + return self._contexts.get(agent_id) + return None + + def clear_all(self) -> None: + """Clear all stored contexts.""" + with self._lock: + self._contexts.clear() + self._agent_sessions.clear() + + +# Global context manager instance +_streaming_context_manager = StreamingContextManager() + + +# Methods to wrap for instrumentation +WRAPPED_METHODS: List[WrapConfig] = [ + # Workflow session methods + WrapConfig( + trace_name="agno.workflow.session.load_session", + package="agno.workflow.workflow", + class_name="Workflow", + method_name="load_session", + handler=get_workflow_session_attributes, + ), + WrapConfig( + trace_name="agno.workflow.session.new_session", + package="agno.workflow.workflow", + class_name="Workflow", + method_name="new_session", + handler=get_workflow_session_attributes, + ), + WrapConfig( + trace_name="agno.workflow.session.read_from_storage", + package="agno.workflow.workflow", + class_name="Workflow", + method_name="read_from_storage", + handler=get_workflow_session_attributes, + ), + WrapConfig( + trace_name="agno.workflow.session.write_to_storage", + package="agno.workflow.workflow", + class_name="Workflow", + method_name="write_to_storage", + handler=get_workflow_session_attributes, + ), +] + + +class StreamingResultWrapper: + """Wrapper for streaming results that maintains agent span as active throughout iteration.""" + + def __init__(self, original_result, span, agent_id, agent_context): + self.original_result = original_result + self.span = span + self.agent_id = agent_id + self.agent_context = agent_context + self._consumed = False + + def __iter__(self): + """Return iterator that keeps agent span active during iteration.""" + context_token = otel_context.attach(self.agent_context) + try: + # Execute iteration within agent context + for item in self.original_result: + # Each item is yielded within the agent span context + yield item + finally: + # Clean up when iteration is complete + otel_context.detach(context_token) + if not self._consumed: + self._consumed = True + self.span.end() + _streaming_context_manager.remove_context(self.agent_id) + + def __getattr__(self, name): + """Delegate attribute access to the original result.""" + return getattr(self.original_result, name) + + +def create_streaming_workflow_wrapper(tracer): + """Create a streaming-aware wrapper for workflow run methods.""" + + def wrapper(wrapped, instance, args, kwargs): + # Get workflow ID for context storage + workflow_id = getattr(instance, "workflow_id", None) or getattr(instance, "id", None) or id(instance) + workflow_id = str(workflow_id) + + # Check if streaming is enabled + is_streaming = kwargs.get("stream", getattr(instance, "stream", False)) + + # For streaming, manually manage span lifecycle + if is_streaming: + span = tracer.start_span("agno.workflow.run.workflow") + + try: + # Set workflow attributes + attributes = get_workflow_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Store context for streaming - capture current context with active span + current_context = trace.set_span_in_context(span, otel_context.get_current()) + _streaming_context_manager.store_context(workflow_id, current_context, span) + + # Execute the original function within workflow context + context_token = otel_context.attach(current_context) + try: + result = wrapped(*args, **kwargs) + finally: + otel_context.detach(context_token) + + # Set result attributes + result_attributes = get_workflow_run_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + + # For streaming results, we need to keep the span open + # The span will be closed when streaming completes + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + span.end() + _streaming_context_manager.remove_context(workflow_id) + raise + else: + # For non-streaming, use normal context manager + with tracer.start_as_current_span("agno.workflow.run.workflow") as span: + try: + # Set workflow attributes + attributes = get_workflow_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Execute the original function + result = wrapped(*args, **kwargs) + + # Set result attributes + result_attributes = get_workflow_run_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + return wrapper + + +def create_streaming_workflow_async_wrapper(tracer): + """Create a streaming-aware async wrapper for workflow run methods.""" + + async def wrapper(wrapped, instance, args, kwargs): + # Get workflow ID for context storage + workflow_id = getattr(instance, "workflow_id", None) or getattr(instance, "id", None) or id(instance) + workflow_id = str(workflow_id) + + # Check if streaming is enabled + is_streaming = kwargs.get("stream", getattr(instance, "stream", False)) + + # For streaming, manually manage span lifecycle + if is_streaming: + span = tracer.start_span("agno.workflow.run.workflow") + + try: + # Set workflow attributes + attributes = get_workflow_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Store context for streaming - capture current context with active span + current_context = trace.set_span_in_context(span, otel_context.get_current()) + _streaming_context_manager.store_context(workflow_id, current_context, span) + + # Execute the original function within workflow context + context_token = otel_context.attach(current_context) + try: + result = await wrapped(*args, **kwargs) + finally: + otel_context.detach(context_token) + + # Set result attributes + result_attributes = get_workflow_run_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + + # For streaming results, we need to keep the span open + # The span will be closed when streaming completes + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + span.end() + _streaming_context_manager.remove_context(workflow_id) + raise + else: + # For non-streaming, use normal context manager + with tracer.start_as_current_span("agno.workflow.run.workflow") as span: + try: + # Set workflow attributes + attributes = get_workflow_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Execute the original function + result = await wrapped(*args, **kwargs) + + # Set result attributes + result_attributes = get_workflow_run_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + return wrapper + + +def create_streaming_agent_wrapper(tracer): + """Create a streaming-aware wrapper for agent run methods.""" + + def wrapper(wrapped, instance, args, kwargs): + # Get agent ID for context storage + agent_id = getattr(instance, "agent_id", None) or getattr(instance, "id", None) or id(instance) + agent_id = str(agent_id) + + # Get session ID for context mapping + session_id = getattr(instance, "session_id", None) + + # Check if streaming is enabled + is_streaming = kwargs.get("stream", getattr(instance, "stream", False)) + + # For streaming, manually manage span lifecycle + if is_streaming: + span = tracer.start_span("agno.agent.run.agent") + + try: + # Set agent attributes + attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Store context for streaming - capture current context with active span + current_context = trace.set_span_in_context(span, otel_context.get_current()) + _streaming_context_manager.store_context(agent_id, current_context, span) + + # Store session-to-agent mapping for LLM context lookup + if session_id: + _streaming_context_manager.store_agent_session_mapping(session_id, agent_id) + + # Execute the original function within agent context + context_token = otel_context.attach(current_context) + try: + result = wrapped(*args, **kwargs) + finally: + otel_context.detach(context_token) + + # Set result attributes + result_attributes = get_agent_run_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + + # Wrap the result to maintain context and end span when complete + if hasattr(result, "__iter__"): + return StreamingResultWrapper(result, span, agent_id, current_context) + else: + # Not actually streaming, clean up immediately + span.end() + _streaming_context_manager.remove_context(agent_id) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + span.end() + _streaming_context_manager.remove_context(agent_id) + raise + else: + # For non-streaming, use normal context manager + with tracer.start_as_current_span("agno.agent.run.agent") as span: + try: + # Set agent attributes + attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Execute the original function + result = wrapped(*args, **kwargs) + + # Set result attributes + result_attributes = get_agent_run_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + return wrapper + + +def create_streaming_agent_async_wrapper(tracer): + """Create a streaming-aware async wrapper for agent run methods.""" + + async def wrapper(wrapped, instance, args, kwargs): + # Get agent ID for context storage + agent_id = getattr(instance, "agent_id", None) or getattr(instance, "id", None) or id(instance) + agent_id = str(agent_id) + + # Get session ID for context mapping + session_id = getattr(instance, "session_id", None) + + # Check if streaming is enabled + is_streaming = kwargs.get("stream", getattr(instance, "stream", False)) + + # For streaming, manually manage span lifecycle + if is_streaming: + span = tracer.start_span("agno.agent.run.agent") + + try: + # Set agent attributes + attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Store context for streaming - capture current context with active span + current_context = trace.set_span_in_context(span, otel_context.get_current()) + _streaming_context_manager.store_context(agent_id, current_context, span) + + # Store session-to-agent mapping for LLM context lookup + if session_id: + _streaming_context_manager.store_agent_session_mapping(session_id, agent_id) + + # Execute the original function within agent context + context_token = otel_context.attach(current_context) + try: + result = await wrapped(*args, **kwargs) + finally: + otel_context.detach(context_token) + + # Set result attributes + result_attributes = get_agent_run_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + + # Wrap the result to maintain context and end span when complete + if hasattr(result, "__iter__"): + return StreamingResultWrapper(result, span, agent_id, current_context) + else: + # Not actually streaming, clean up immediately + span.end() + _streaming_context_manager.remove_context(agent_id) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + span.end() + _streaming_context_manager.remove_context(agent_id) + raise + else: + # For non-streaming, use normal context manager + with tracer.start_as_current_span("agno.agent.run.agent") as span: + try: + # Set agent attributes + attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Execute the original function + result = await wrapped(*args, **kwargs) + + # Set result attributes + result_attributes = get_agent_run_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + return wrapper + + +def create_streaming_tool_wrapper(tracer): + """Create a streaming-aware wrapper for tool execution methods.""" + + def wrapper(wrapped, instance, args, kwargs): + # Try to find the agent or workflow context for proper span hierarchy + parent_context = None + parent_span = None + + # Try to get context from agent + try: + if hasattr(instance, "_agent"): + agent = instance._agent + agent_id = getattr(agent, "agent_id", None) or getattr(agent, "id", None) or id(agent) + agent_id = str(agent_id) + context_info = _streaming_context_manager.get_context(agent_id) + if context_info: + parent_context, parent_span = context_info + except Exception: + pass # Continue without agent context if not found + + # Try to get context from workflow if agent context not found + if not parent_context: + try: + if hasattr(instance, "_workflow"): + workflow = instance._workflow + workflow_id = ( + getattr(workflow, "workflow_id", None) or getattr(workflow, "id", None) or id(workflow) + ) + workflow_id = str(workflow_id) + context_info = _streaming_context_manager.get_context(workflow_id) + if context_info: + parent_context, parent_span = context_info + except Exception: + pass # Continue without workflow context if not found + + # Use parent context if available, otherwise use current context + if parent_context: + context_token = otel_context.attach(parent_context) + try: + with tracer.start_as_current_span("agno.tool.execute.tool_usage") as span: + try: + # Set tool attributes + attributes = get_tool_execution_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Execute the original function + result = wrapped(*args, **kwargs) + + # Set result attributes + result_attributes = get_tool_execution_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + finally: + otel_context.detach(context_token) + else: + # Fallback to normal span creation + with tracer.start_as_current_span("agno.tool.execute.tool_usage") as span: + try: + # Set tool attributes + attributes = get_tool_execution_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Execute the original function + result = wrapped(*args, **kwargs) + + # Set result attributes + result_attributes = get_tool_execution_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + return wrapper + + +def create_metrics_wrapper(tracer): + """Create a wrapper for metrics methods with dynamic span naming.""" + + def wrapper(wrapped, instance, args, kwargs): + # Extract model ID for dynamic span naming + span_name = "agno.agent.metrics" # fallback + if hasattr(instance, "model") and instance.model and hasattr(instance.model, "id"): + model_id = str(instance.model.id) + span_name = f"{model_id}.llm" + + with tracer.start_as_current_span(span_name) as span: + try: + # Set attributes + attributes = get_metrics_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Execute the original function + result = wrapped(*args, **kwargs) + + # Set result attributes + result_attributes = get_metrics_attributes(args=(instance,) + args, kwargs=kwargs, return_value=result) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + return wrapper + + +def create_team_internal_wrapper(tracer): + """Create a wrapper for Team internal methods (_run/_arun) that manages team span lifecycle.""" + + def wrapper(wrapped, instance, args, kwargs): + # Get team ID for context storage + team_id = getattr(instance, "team_id", None) or getattr(instance, "id", None) or id(instance) + team_id = str(team_id) + + # Check if we already have a team context (from print_response) + existing_context = _streaming_context_manager.get_context(team_id) + + if existing_context: + # We're being called from print_response, use existing context + parent_context, parent_span = existing_context + + # Execute within the existing team context + context_token = otel_context.attach(parent_context) + try: + with tracer.start_as_current_span("agno.team.run.workflow") as span: + try: + # Set workflow attributes + attributes = get_team_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Execute the original function + result = wrapped(*args, **kwargs) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + finally: + # Close the parent team span when workflow completes + if parent_span: + parent_span.end() + _streaming_context_manager.remove_context(team_id) + finally: + otel_context.detach(context_token) + else: + # Direct call to _run, create new team span + with tracer.start_as_current_span("agno.team.run.workflow") as span: + try: + # Set workflow attributes + attributes = get_team_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Execute the original function + result = wrapped(*args, **kwargs) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + return wrapper + + +def create_team_internal_async_wrapper(tracer): + """Create an async wrapper for Team internal methods (_arun) that manages team span lifecycle.""" + + async def wrapper(wrapped, instance, args, kwargs): + # Get team ID for context storage + team_id = getattr(instance, "team_id", None) or getattr(instance, "id", None) or id(instance) + team_id = str(team_id) + + # Check if we already have a team context (from print_response) + existing_context = _streaming_context_manager.get_context(team_id) + + if existing_context: + # We're being called from print_response, use existing context + parent_context, parent_span = existing_context + + # Execute within the existing team context + context_token = otel_context.attach(parent_context) + try: + with tracer.start_as_current_span("agno.team.run.workflow") as span: + try: + # Set workflow attributes + attributes = get_team_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Execute the original function + result = await wrapped(*args, **kwargs) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + finally: + # Close the parent team span when workflow completes + if parent_span: + parent_span.end() + _streaming_context_manager.remove_context(team_id) + finally: + otel_context.detach(context_token) + else: + # Direct call to _arun, create new team span + with tracer.start_as_current_span("agno.team.run.workflow") as span: + try: + # Set workflow attributes + attributes = get_team_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Execute the original function + result = await wrapped(*args, **kwargs) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + return wrapper + + +def create_team_wrapper(tracer): + """Create a wrapper for Team methods that establishes the team context.""" + + def wrapper(wrapped, instance, args, kwargs): + # Get team ID for context storage + team_id = getattr(instance, "team_id", None) or getattr(instance, "id", None) or id(instance) + team_id = str(team_id) + + # Check if streaming is enabled + is_streaming = kwargs.get("stream", getattr(instance, "stream", False)) + + # For print_response, we need to wrap the internal _run method instead + # because print_response returns immediately + if wrapped.__name__ == "print_response": + # Create team span but don't manage it here + span = tracer.start_span("agno.team.run.agent") + + try: + # Set team attributes + attributes = get_team_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Store context for child spans + current_context = trace.set_span_in_context(span, otel_context.get_current()) + _streaming_context_manager.store_context(team_id, current_context, span) + + # The span will be closed by the internal _run method + # Just execute print_response normally + result = wrapped(*args, **kwargs) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + span.end() + _streaming_context_manager.remove_context(team_id) + raise + else: + # For run/arun methods, use standard span management + span = tracer.start_span("agno.team.run.agent") + + try: + # Set team attributes + attributes = get_team_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Store context for child spans + current_context = trace.set_span_in_context(span, otel_context.get_current()) + _streaming_context_manager.store_context(team_id, current_context, span) + + # Execute the original function within team context + context_token = otel_context.attach(current_context) + try: + result = wrapped(*args, **kwargs) + + # For streaming results, wrap them to keep span alive + if is_streaming and hasattr(result, "__iter__"): + return StreamingResultWrapper(result, span, team_id, current_context) + else: + # Non-streaming, close span + span.end() + _streaming_context_manager.remove_context(team_id) + return result + + finally: + otel_context.detach(context_token) + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + span.end() + _streaming_context_manager.remove_context(team_id) + raise + + return wrapper + + +def create_team_async_wrapper(tracer): + """Create an async wrapper for Team methods that establishes the team context.""" + + async def wrapper(wrapped, instance, args, kwargs): + # Get team ID for context storage + team_id = getattr(instance, "team_id", None) or getattr(instance, "id", None) or id(instance) + team_id = str(team_id) + + # Check if streaming is enabled + is_streaming = kwargs.get("stream", getattr(instance, "stream", False)) + + # Create team span + span = tracer.start_span("agno.team.run.agent") + + try: + # Set team attributes + attributes = get_team_run_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Store context for child spans - capture current context with active span + current_context = trace.set_span_in_context(span, otel_context.get_current()) + _streaming_context_manager.store_context(team_id, current_context, span) + + # Execute the original function within team context + context_token = otel_context.attach(current_context) + try: + result = await wrapped(*args, **kwargs) + + # For non-streaming, close the span + if not is_streaming: + span.end() + _streaming_context_manager.remove_context(team_id) + + return result + finally: + otel_context.detach(context_token) + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + span.end() + _streaming_context_manager.remove_context(team_id) + raise + + return wrapper + + +def get_agent_context_for_llm(): + """Helper function for LLM instrumentation to get current agent context.""" + current_context = otel_context.get_current() + current_span = trace.get_current_span(current_context) + + # Check if we're already in an agent span + if current_span and hasattr(current_span, "name") and "agent" in current_span.name: + return current_context, current_span + + # Try to find stored agent context by checking active contexts + # This is a fallback for cases where context isn't properly propagated + return None, None + + +class AgnoInstrumentor(BaseInstrumentor): + """Agno instrumentation class.""" + + def instrumentation_dependencies(self) -> Collection[str]: + """Returns list of packages required for instrumentation.""" + return ["agno >= 0.1.0"] + + def _instrument(self, **kwargs): + """Install instrumentation for Agno.""" + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION, tracer_provider) + + meter_provider = kwargs.get("meter_provider") + meter = get_meter(LIBRARY_NAME, LIBRARY_VERSION, meter_provider) + + # Create metrics + meter.create_histogram( + name=Meters.LLM_TOKEN_USAGE, + unit="token", + description="Measures number of input and output tokens used with Agno agents", + ) + + meter.create_histogram( + name=Meters.LLM_OPERATION_DURATION, + unit="s", + description="Agno agent operation duration", + ) + + meter.create_counter( + name=Meters.LLM_COMPLETIONS_EXCEPTIONS, + unit="time", + description="Number of exceptions occurred during Agno agent operations", + ) + + # Standard method wrapping using WrapConfig + for wrap_config in WRAPPED_METHODS: + try: + wrap(wrap_config, tracer) + except (AttributeError, ModuleNotFoundError): + logger.debug(f"Could not wrap {wrap_config}") + + # Special handling for streaming methods + # These require custom wrappers due to their streaming nature + try: + # Streaming agent methods + wrap_function_wrapper( + "agno.agent", + "Agent.run", + create_streaming_agent_wrapper(tracer), + ) + wrap_function_wrapper( + "agno.agent", + "Agent.arun", + create_streaming_agent_async_wrapper(tracer), + ) + + # Streaming workflow methods + wrap_function_wrapper( + "agno.workflow.workflow", + "Workflow.run_workflow", + create_streaming_workflow_wrapper(tracer), + ) + wrap_function_wrapper( + "agno.workflow.workflow", + "Workflow.arun_workflow", + create_streaming_workflow_async_wrapper(tracer), + ) + + # Streaming tool execution + wrap_function_wrapper( + "agno.tools.function", + "FunctionCall.execute", + create_streaming_tool_wrapper(tracer), + ) + + # Metrics wrapper + wrap_function_wrapper( + "agno.agent", + "Agent._set_session_metrics", + create_metrics_wrapper(tracer), + ) + + # Team methods + wrap_function_wrapper( + "agno.team.team", + "Team.run", + create_team_wrapper(tracer), + ) + wrap_function_wrapper( + "agno.team.team", + "Team.arun", + create_team_async_wrapper(tracer), + ) + wrap_function_wrapper( + "agno.team.team", + "Team.print_response", + create_team_wrapper(tracer), + ) + + # Team internal methods with special handling + wrap_function_wrapper( + "agno.team.team", + "Team._run", + create_team_internal_wrapper(tracer), + ) + wrap_function_wrapper( + "agno.team.team", + "Team._arun", + create_team_internal_async_wrapper(tracer), + ) + + logger.debug("Successfully wrapped Agno streaming methods") + except (AttributeError, ModuleNotFoundError) as e: + logger.debug(f"Failed to wrap Agno streaming methods: {e}") + + logger.info("Agno instrumentation installed successfully") + + def _uninstrument(self, **kwargs): + """Remove instrumentation for Agno.""" + # Clear streaming contexts + _streaming_context_manager.clear_all() + + # Unwrap standard methods + for wrap_config in WRAPPED_METHODS: + try: + unwrap(wrap_config) + except Exception: + logger.debug(f"Failed to unwrap {wrap_config}") + + # Unwrap streaming methods + try: + from opentelemetry.instrumentation.utils import unwrap as otel_unwrap + + # Agent methods + otel_unwrap("agno.agent", "Agent.run") + otel_unwrap("agno.agent", "Agent.arun") + + # Workflow methods + otel_unwrap("agno.workflow.workflow", "Workflow.run_workflow") + otel_unwrap("agno.workflow.workflow", "Workflow.arun_workflow") + + # Tool methods + otel_unwrap("agno.tools.function", "FunctionCall.execute") + + # Metrics methods + otel_unwrap("agno.agent", "Agent._set_session_metrics") + + # Team methods + otel_unwrap("agno.team.team", "Team.run") + otel_unwrap("agno.team.team", "Team.arun") + otel_unwrap("agno.team.team", "Team.print_response") + otel_unwrap("agno.team.team", "Team._run") + otel_unwrap("agno.team.team", "Team._arun") + + except (AttributeError, ModuleNotFoundError): + logger.debug("Failed to unwrap Agno streaming methods") + + logger.info("Agno instrumentation removed successfully") diff --git a/agentops/semconv/workflow.py b/agentops/semconv/workflow.py index 5d3199e26..e2bdfbaf5 100644 --- a/agentops/semconv/workflow.py +++ b/agentops/semconv/workflow.py @@ -25,3 +25,57 @@ class WorkflowAttributes: WORKFLOW_STEP_STATUS = "workflow.step.status" # Status of the workflow step WORKFLOW_STEP_ERROR = "workflow.step.error" # Error from the workflow step WORKFLOW_STEP = "workflow.step" + + # Core workflow identification + WORKFLOW_ID = "workflow.workflow_id" # Unique identifier for the workflow instance + WORKFLOW_DESCRIPTION = "workflow.description" # Description of the workflow + WORKFLOW_APP_ID = "workflow.app_id" # Application ID associated with the workflow + + # Session and user context + WORKFLOW_SESSION_ID = "workflow.session_id" # Session ID for the workflow execution + WORKFLOW_SESSION_NAME = "workflow.session_name" # Name of the workflow session + WORKFLOW_USER_ID = "workflow.user_id" # User ID associated with the workflow + + # Run-specific attributes + WORKFLOW_RUN_ID = "workflow.run_id" # Unique identifier for this workflow run + + # Configuration flags + WORKFLOW_DEBUG_MODE = "workflow.debug_mode" # Whether debug mode is enabled + WORKFLOW_MONITORING = "workflow.monitoring" # Whether monitoring is enabled + WORKFLOW_TELEMETRY = "workflow.telemetry" # Whether telemetry is enabled + + # Memory and storage + WORKFLOW_MEMORY_TYPE = "workflow.memory.type" # Type of memory used by workflow + WORKFLOW_STORAGE_TYPE = "workflow.storage.type" # Type of storage used by workflow + + # Input parameters metadata + WORKFLOW_INPUT_PARAMETER_COUNT = "workflow.input.parameter_count" # Number of input parameters + WORKFLOW_INPUT_PARAMETER_KEYS = "workflow.input.parameter_keys" # Keys of input parameters + + # Method metadata + WORKFLOW_METHOD_PARAMETER_COUNT = "workflow.method.parameter_count" # Number of method parameters + WORKFLOW_METHOD_RETURN_TYPE = "workflow.method.return_type" # Return type of the workflow method + + # Output metadata + WORKFLOW_OUTPUT_CONTENT_TYPE = "workflow.output.content_type" # Content type of the output + WORKFLOW_OUTPUT_EVENT = "workflow.output.event" # Event type in the output + WORKFLOW_OUTPUT_MODEL = "workflow.output.model" # Model used for the output + WORKFLOW_OUTPUT_MODEL_PROVIDER = "workflow.output.model_provider" # Provider of the model + WORKFLOW_OUTPUT_MESSAGE_COUNT = "workflow.output.message_count" # Number of messages in output + WORKFLOW_OUTPUT_TOOL_COUNT = "workflow.output.tool_count" # Number of tools in output + WORKFLOW_OUTPUT_IMAGE_COUNT = "workflow.output.image_count" # Number of images in output + WORKFLOW_OUTPUT_VIDEO_COUNT = "workflow.output.video_count" # Number of videos in output + WORKFLOW_OUTPUT_AUDIO_COUNT = "workflow.output.audio_count" # Number of audio items in output + WORKFLOW_OUTPUT_IS_STREAMING = "workflow.output.is_streaming" # Whether output is streaming + + # Session-specific attributes + WORKFLOW_SESSION_SESSION_ID = "workflow.session.session_id" # Session ID in session context + WORKFLOW_SESSION_SESSION_NAME = "workflow.session.session_name" # Session name in session context + WORKFLOW_SESSION_WORKFLOW_ID = "workflow.session.workflow_id" # Workflow ID in session context + WORKFLOW_SESSION_USER_ID = "workflow.session.user_id" # User ID in session context + WORKFLOW_SESSION_STATE_KEYS = "workflow.session.state_keys" # Keys in session state + WORKFLOW_SESSION_STATE_SIZE = "workflow.session.state_size" # Size of session state + WORKFLOW_SESSION_STORAGE_TYPE = "workflow.session.storage_type" # Storage type for session + WORKFLOW_SESSION_RETURNED_SESSION_ID = "workflow.session.returned_session_id" # Session ID returned + WORKFLOW_SESSION_CREATED_AT = "workflow.session.created_at" # Session creation timestamp + WORKFLOW_SESSION_UPDATED_AT = "workflow.session.updated_at" # Session update timestamp diff --git a/docs/images/external/agno/agno.png b/docs/images/external/agno/agno.png new file mode 100644 index 000000000..5251e720d Binary files /dev/null and b/docs/images/external/agno/agno.png differ diff --git a/docs/mint.json b/docs/mint.json index a6fcf4af1..1784b16f7 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -168,6 +168,7 @@ "group": "Integrations", "pages": [ "v2/integrations/ag2", + "v2/integrations/agno", "v2/integrations/anthropic", "v2/integrations/autogen", "v2/integrations/crewai", diff --git a/docs/v2/examples/agno.mdx b/docs/v2/examples/agno.mdx new file mode 100644 index 000000000..b587721e8 --- /dev/null +++ b/docs/v2/examples/agno.mdx @@ -0,0 +1,120 @@ +--- +title: 'Agno' +description: 'Async Operations with Agno' +--- +{/* SOURCE_FILE: examples/agno/agno_async_operations.ipynb */} + +_View Notebook on Github_ + +# Async Operations with Agno + +This notebook demonstrates how to leverage asynchronous programming with Agno agents to execute multiple AI tasks concurrently, significantly improving performance and efficiency. + +## Overview +This notebook demonstrates a practical example of concurrent AI operations where we: + +1. **Initialize an Agno agent** with OpenAI's GPT-4o-mini model +2. **Create multiple async tasks** that query the AI about different programming languages +3. **Compare performance** between concurrent and sequential execution + +By using async operations, you can run multiple AI queries simultaneously instead of waiting for each one to complete sequentially. This is particularly beneficial when dealing with I/O-bound operations like API calls to AI models. + + + + + +## Installation + + ```bash pip + pip install agentops agno python-dotenv + ``` + ```bash poetry + poetry add agentops agno python-dotenv + ``` + ```bash uv + uv add agentops agno python-dotenv + ``` + + +```python +import os +import asyncio +from dotenv import load_dotenv + +import agentops +from agno.agent import Agent +from agno.team import Team +from agno.models.openai import OpenAIChat +``` + + +```python +load_dotenv() +os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "your_openai_api_key_here") +os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY", "your_agentops_api_key_here") +``` + + +```python +agentops.init(auto_start_session=False, tags=["agno-example", "async-operation"]) +``` + + +```python +async def demonstrate_async_operations(): + """ + Demonstrate concurrent execution of multiple AI agent tasks. + + This function creates multiple async tasks that execute concurrently rather than sequentially. + Each task makes an independent API call to the AI model, and asyncio.gather() + waits for all tasks to complete before returning results. + + Performance benefit: Instead of 3 sequential calls taking ~90 seconds total, + concurrent execution typically completes in ~30 seconds. + """ + tracer = agentops.start_trace(trace_name="Agno Async Operations Example",) + + try: + # Initialize AI agent with specified model + agent = Agent(model=OpenAIChat(id="gpt-4o-mini")) + + async def task1(): + """Query AI about Python programming language.""" + response = await agent.arun("Explain Python programming language in one paragraph") + return f"Python: {response.content}" + + async def task2(): + """Query AI about JavaScript programming language.""" + response = await agent.arun("Explain JavaScript programming language in one paragraph") + return f"JavaScript: {response.content}" + + async def task3(): + """Query AI for comparison between programming languages.""" + response = await agent.arun("Compare Python and JavaScript briefly") + return f"Comparison: {response.content}" + + # Execute all tasks concurrently using asyncio.gather() + results = await asyncio.gather(task1(), task2(), task3()) + + for i, result in enumerate(results, 1): + print(f"\nTask {i} Result:") + print(result) + print("-" * 50) + + agentops.end_trace(tracer, end_state="Success") + + except Exception as e: + print(f"An error occurred: {e}") + agentops.end_trace(tracer, end_state="Error") +``` + + +```python +await demonstrate_async_operations() +``` + + + + + + \ No newline at end of file diff --git a/docs/v2/examples/examples.mdx b/docs/v2/examples/examples.mdx index bdf9c2b82..9021e70f6 100644 --- a/docs/v2/examples/examples.mdx +++ b/docs/v2/examples/examples.mdx @@ -44,6 +44,10 @@ description: 'Examples of AgentOps with various integrations' Multi-agent conversations with memory capabilities + + Modern AI agent framework with teams, workflows, and tool integration + + } iconType="image" href="/v2/examples/autogen"> AG2 multi-agent workflow demonstration diff --git a/docs/v2/integrations/agno.mdx b/docs/v2/integrations/agno.mdx new file mode 100644 index 000000000..6adf71348 --- /dev/null +++ b/docs/v2/integrations/agno.mdx @@ -0,0 +1,155 @@ +--- +title: Agno +description: "Track your Agno agents, teams, and workflows with AgentOps" +--- + +[Agno](https://docs.agno.com) is a modern AI agent framework for building intelligent agents, teams, and workflows. AgentOps provides automatic instrumentation to track all Agno operations including agent interactions, team coordination, tool usage, and workflow execution. + +## Installation + +Install AgentOps and Agno: + + + ```bash pip + pip install agentops agno + ``` + ```bash poetry + poetry add agentops agno + ``` + ```bash uv + uv add agentops agno + ``` + + +## Setting Up API Keys + +You'll need API keys for AgentOps and your chosen LLM provider: +- **AGENTOPS_API_KEY**: From your [AgentOps Dashboard](https://app.agentops.ai/) +- **OPENAI_API_KEY**: From the [OpenAI Platform](https://platform.openai.com/api-keys) (if using OpenAI) +- **ANTHROPIC_API_KEY**: From [Anthropic Console](https://console.anthropic.com/) (if using Claude) + +Set these as environment variables or in a `.env` file. + + + ```bash Export to CLI + export AGENTOPS_API_KEY="your_agentops_api_key_here" + export OPENAI_API_KEY="your_openai_api_key_here" + export ANTHROPIC_API_KEY="your_anthropic_api_key_here" # Optional + ``` + ```txt Set in .env file + AGENTOPS_API_KEY="your_agentops_api_key_here" + OPENAI_API_KEY="your_openai_api_key_here" + ANTHROPIC_API_KEY="your_anthropic_api_key_here" # Optional + ``` + + +## Quick Start + +```python +import os +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + + +from agno.agent import Agent +from agno.team import Team +from agno.models.openai import OpenAIChat + +# Initialize AgentOps +import agentops +agentops.init(api_key=os.getenv("AGENTOPS_API_KEY")) + +# Create and run an agent +agent = Agent( + name="Assistant", + role="Helpful AI assistant", + model=OpenAIChat(id="gpt-4o-mini") +) + +response = agent.run("What are the key benefits of AI agents?") +print(response.content) +``` + +## AgentOps Integration + +### Basic Agent Tracking + +AgentOps automatically instruments Agno agents and teams: + +```python +import agentops +from agno.agent import Agent +from agno.team import Team +from agno.models.openai import OpenAIChat + +# Initialize AgentOps - this enables automatic tracking +agentops.init(api_key=os.getenv("AGENTOPS_API_KEY")) + +# Create agents - automatically tracked by AgentOps +agent = Agent( + name="Assistant", + role="Helpful AI assistant", + model=OpenAIChat(id="gpt-4o-mini") +) + +# Create teams - coordination automatically tracked +team = Team( + name="Research Team", + mode="coordinate", + members=[agent] +) + +# All operations are automatically logged to AgentOps +response = team.run("Analyze the current AI market trends") +print(response.content) +``` + +## What Gets Tracked + +AgentOps automatically captures: + +- **Agent Interactions**: All agent inputs, outputs, and configurations +- **Team Coordination**: Multi-agent collaboration patterns and results +- **Tool Executions**: Function calls, parameters, and return values +- **Workflow Steps**: Session states, caching, and performance metrics +- **Token Usage**: Costs and resource consumption across all operations +- **Timing Metrics**: Response times and concurrent operation performance +- **Error Tracking**: Failures and debugging information + + +## Dashboard and Monitoring + +Once your Agno agents are running with AgentOps, you can monitor them in the [AgentOps Dashboard](https://app.agentops.ai/): + +- **Real-time Monitoring**: Live agent status and performance +- **Execution Traces**: Detailed logs of agent interactions +- **Performance Analytics**: Token usage, costs, and timing metrics +- **Team Collaboration**: Visual representation of multi-agent workflows +- **Error Tracking**: Comprehensive error logs and debugging information + +## Examples + + + + Learn the fundamentals of creating AI agents and organizing them into collaborative teams + + + + Execute multiple AI tasks concurrently for improved performance using asyncio + + + + Build sophisticated multi-agent teams with specialized tools for comprehensive research + + + + Implement Retrieval-Augmented Generation with vector databases and knowledge bases + + + + Create custom workflows with intelligent caching for optimized agent performance + + + diff --git a/docs/v2/introduction.mdx b/docs/v2/introduction.mdx index 10757aabe..02c252328 100644 --- a/docs/v2/introduction.mdx +++ b/docs/v2/introduction.mdx @@ -29,6 +29,7 @@ description: "AgentOps is the developer favorite platform for testing, debugging } iconType="image" href="/v2/integrations/ag2" /> + } iconType="image" href="/v2/integrations/agno" /> } iconType="image" href="/v2/integrations/autogen" /> } iconType="image" href="/v2/integrations/crewai" /> } iconType="image" href="/v2/integrations/google_adk" /> diff --git a/examples/agno/agno_async_operations.ipynb b/examples/agno/agno_async_operations.ipynb new file mode 100644 index 000000000..664dc8b3c --- /dev/null +++ b/examples/agno/agno_async_operations.ipynb @@ -0,0 +1,166 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "a35d851b", + "metadata": {}, + "source": [ + "\n", + "# Async Operations with Agno\n", + "\n", + "This notebook demonstrates how to leverage asynchronous programming with Agno agents to execute multiple AI tasks concurrently, significantly improving performance and efficiency.\n", + "\n", + "## Overview\n", + "This notebook demonstrates a practical example of concurrent AI operations where we:\n", + "\n", + "1. **Initialize an Agno agent** with OpenAI's GPT-4o-mini model\n", + "2. **Create multiple async tasks** that query the AI about different programming languages\n", + "3. **Compare performance** between concurrent and sequential execution\n", + "\n", + "By using async operations, you can run multiple AI queries simultaneously instead of waiting for each one to complete sequentially. This is particularly beneficial when dealing with I/O-bound operations like API calls to AI models.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "75767381", + "metadata": {}, + "outputs": [], + "source": [ + "# Install the required dependencies:\n", + "%pip install agentops\n", + "%pip install agno\n", + "%pip install python-dotenv" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fe7d8b83", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import asyncio\n", + "from dotenv import load_dotenv\n", + "\n", + "import agentops\n", + "from agno.agent import Agent\n", + "from agno.team import Team\n", + "from agno.models.openai import OpenAIChat" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c6653555", + "metadata": {}, + "outputs": [], + "source": [ + "load_dotenv()\n", + "os.environ[\"OPENAI_API_KEY\"] = os.getenv(\"OPENAI_API_KEY\", \"your_openai_api_key_here\")\n", + "os.environ[\"AGENTOPS_API_KEY\"] = os.getenv(\"AGENTOPS_API_KEY\", \"your_agentops_api_key_here\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ac01eb8a", + "metadata": {}, + "outputs": [], + "source": [ + "agentops.init(auto_start_session=False, tags=[\"agno-example\", \"async-operation\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ca0f1a8a", + "metadata": {}, + "outputs": [], + "source": [ + "async def demonstrate_async_operations():\n", + " \"\"\"\n", + " Demonstrate concurrent execution of multiple AI agent tasks.\n", + " \n", + " This function creates multiple async tasks that execute concurrently rather than sequentially.\n", + " Each task makes an independent API call to the AI model, and asyncio.gather() \n", + " waits for all tasks to complete before returning results.\n", + " \n", + " Performance benefit: Instead of 3 sequential calls taking ~90 seconds total,\n", + " concurrent execution typically completes in ~30 seconds.\n", + " \"\"\"\n", + " tracer = agentops.start_trace(trace_name=\"Agno Async Operations Example\",)\n", + "\n", + " try:\n", + " # Initialize AI agent with specified model\n", + " agent = Agent(model=OpenAIChat(id=\"gpt-4o-mini\"))\n", + " \n", + " async def task1():\n", + " \"\"\"Query AI about Python programming language.\"\"\"\n", + " response = await agent.arun(\"Explain Python programming language in one paragraph\")\n", + " return f\"Python: {response.content}\"\n", + "\n", + " async def task2():\n", + " \"\"\"Query AI about JavaScript programming language.\"\"\"\n", + " response = await agent.arun(\"Explain JavaScript programming language in one paragraph\")\n", + " return f\"JavaScript: {response.content}\"\n", + "\n", + " async def task3():\n", + " \"\"\"Query AI for comparison between programming languages.\"\"\"\n", + " response = await agent.arun(\"Compare Python and JavaScript briefly\")\n", + " return f\"Comparison: {response.content}\"\n", + "\n", + " # Execute all tasks concurrently using asyncio.gather()\n", + " results = await asyncio.gather(task1(), task2(), task3())\n", + " \n", + " for i, result in enumerate(results, 1):\n", + " print(f\"\\nTask {i} Result:\")\n", + " print(result)\n", + " print(\"-\" * 50)\n", + "\n", + " agentops.end_trace(tracer, end_state=\"Success\")\n", + "\n", + " except Exception as e:\n", + " print(f\"An error occurred: {e}\")\n", + " agentops.end_trace(tracer, end_state=\"Error\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0aa21331", + "metadata": {}, + "outputs": [], + "source": [ + "await demonstrate_async_operations()" + ] + } + ], + "metadata": { + "jupytext": { + "cell_metadata_filter": "-all", + "main_language": "python", + "notebook_metadata_filter": "-all" + }, + "kernelspec": { + "display_name": "agentops (3.11.11)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/agno/agno_async_operations.py b/examples/agno/agno_async_operations.py new file mode 100644 index 000000000..8f4c43ded --- /dev/null +++ b/examples/agno/agno_async_operations.py @@ -0,0 +1,79 @@ +""" +# Async Operations with Agno + +This notebook demonstrates how to leverage asynchronous programming with Agno agents to execute multiple AI tasks concurrently, significantly improving performance and efficiency. + +## Overview + +This notebook demonstrates a practical example of concurrent AI operations where we: + +1. **Initialize an Agno agent** with OpenAI's GPT-4o-mini model +2. **Create multiple async tasks** that query the AI about different programming languages +3. **Compare performance** between concurrent and sequential execution + +By using async operations, you can run multiple AI queries simultaneously instead of waiting for each one to complete sequentially. This is particularly beneficial when dealing with I/O-bound operations like API calls to AI models. +""" +import os +import asyncio +from dotenv import load_dotenv +import agentops +from agno.agent import Agent +from agno.models.openai import OpenAIChat + +load_dotenv() + +os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "your_openai_api_key_here") +os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY", "your_agentops_api_key_here") + +agentops.init(auto_start_session=False, tags=["agno-example", "async-operation"]) + + +async def demonstrate_async_operations(): + """ + Demonstrate concurrent execution of multiple AI agent tasks. + + This function creates multiple async tasks that execute concurrently rather than sequentially. + Each task makes an independent API call to the AI model, and asyncio.gather() + waits for all tasks to complete before returning results. + + Performance benefit: Instead of 3 sequential calls taking ~90 seconds total, + concurrent execution typically completes in ~30 seconds. + """ + tracer = agentops.start_trace(trace_name="Agno Async Operations Example") + + try: + # Initialize AI agent with specified model + agent = Agent(model=OpenAIChat(id="gpt-4o-mini")) + + async def task1(): + """Query AI about Python programming language.""" + response = await agent.arun("Explain Python programming language in one paragraph") + return f"Python: {response.content}" + + async def task2(): + """Query AI about JavaScript programming language.""" + response = await agent.arun("Explain JavaScript programming language in one paragraph") + return f"JavaScript: {response.content}" + + async def task3(): + """Query AI for comparison between programming languages.""" + response = await agent.arun("Compare Python and JavaScript briefly") + return f"Comparison: {response.content}" + + # Execute all tasks concurrently using asyncio.gather() + results = await asyncio.gather(task1(), task2(), task3()) + + for i, result in enumerate(results, 1): + print(f"\nTask {i} Result:") + print(result) + print("-" * 50) + + agentops.end_trace(tracer, end_state="Success") + + except Exception as e: + print(f"An error occurred: {e}") + agentops.end_trace(tracer, end_state="Error") + + +if __name__ == "__main__": + asyncio.run(demonstrate_async_operations()) diff --git a/examples/agno/agno_basic_agents.ipynb b/examples/agno/agno_basic_agents.ipynb new file mode 100644 index 000000000..4988be5c6 --- /dev/null +++ b/examples/agno/agno_basic_agents.ipynb @@ -0,0 +1,216 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "8b2111ae", + "metadata": {}, + "source": [ + "# Basic Agents and Teams with Agno\n", + "\n", + "This example demonstrates the fundamentals of creating AI agents and organizing them into collaborative teams using the Agno framework.\n", + "\n", + "## Overview\n", + "\n", + "In this example, you'll learn how to:\n", + "- **Create specialized AI agents** with specific roles and expertise\n", + "- **Organize agents into teams** for collaborative problem-solving\n", + "- **Use coordination modes** for effective agent communication\n", + "- **Monitor agent interactions** with AgentOps integration\n", + "\n", + "## Key Concepts\n", + "\n", + "### Agents\n", + "Individual AI entities with specific roles and capabilities. Each agent can be assigned a particular area of expertise, making them specialists in their domain.\n", + "\n", + "### Teams\n", + "Collections of agents that work together to solve complex tasks. Teams can coordinate their responses, share information, and delegate tasks based on each agent's expertise.\n", + "\n", + "### Coordination Modes\n", + "Different strategies for how agents within a team interact and collaborate. The \"coordinate\" mode enables intelligent task routing and information sharing." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d087e416", + "metadata": {}, + "outputs": [], + "source": [ + "# Install the required dependencies\n", + "%pip install agentops\n", + "%pip install agno\n", + "%pip install python-dotenv" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "39ad00cb", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from dotenv import load_dotenv\n", + "\n", + "import agentops\n", + "from agno.agent import Agent\n", + "from agno.team import Team\n", + "from agno.models.openai import OpenAIChat" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f733e281", + "metadata": {}, + "outputs": [], + "source": [ + "load_dotenv()\n", + "os.environ[\"OPENAI_API_KEY\"] = os.getenv(\"OPENAI_API_KEY\", \"your_openai_api_key_here\")\n", + "os.environ[\"AGENTOPS_API_KEY\"] = os.getenv(\"AGENTOPS_API_KEY\", \"your_agentops_api_key_here\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fb37819a", + "metadata": {}, + "outputs": [], + "source": [ + "agentops.init(auto_start_session=False, tags=[\"agno-example\", \"basics\", \"agents-and-teams\"])" + ] + }, + { + "cell_type": "markdown", + "id": "e954b898", + "metadata": { + "vscode": { + "languageId": "raw" + } + }, + "source": [ + "## Creating Agents and Teams\n", + "\n", + "Now let's create our specialized agents and organize them into a collaborative team:\n", + "\n", + "### Step 1: Create Individual Agents\n", + "We'll create two agents with different specializations:\n", + "- **News Agent**: Specializes in gathering and analyzing news\n", + "- **Weather Agent**: Specializes in weather forecasting and analysis\n", + "\n", + "### Step 2: Form a Team\n", + "We'll combine these agents into a team using the \"coordinate\" mode, which enables:\n", + "- Intelligent task routing based on agent expertise\n", + "- Information sharing between agents\n", + "- Collaborative problem-solving\n", + "\n", + "### Step 3: Execute Tasks\n", + "The team will automatically delegate tasks to the most appropriate agent(s) based on the query.\n" + ] + }, + { + "cell_type": "markdown", + "id": "c12702d0", + "metadata": {}, + "source": [ + "Here's the code to implement this:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f872be2e", + "metadata": {}, + "outputs": [], + "source": [ + "def demonstrate_basic_agents():\n", + " \"\"\"\n", + " Demonstrate basic agent creation and team coordination.\n", + " \n", + " This function shows how to:\n", + " 1. Create specialized agents with specific roles\n", + " 2. Organize agents into a team\n", + " 3. Use the team to solve tasks that require multiple perspectives\n", + " \"\"\"\n", + " tracer = agentops.start_trace(trace_name=\"Agno Basic Agents and Teams Demonstration\",)\n", + "\n", + " try:\n", + " # Create individual agents with specific roles\n", + " # Each agent has a name and a role that defines its expertise\n", + "\n", + " # News Agent: Specializes in gathering and analyzing news information\n", + " news_agent = Agent(\n", + " name=\"News Agent\", \n", + " role=\"Get the latest news and provide news analysis\", \n", + " model=OpenAIChat(id=\"gpt-4o-mini\")\n", + " )\n", + "\n", + " # Weather Agent: Specializes in weather forecasting and analysis\n", + " weather_agent = Agent(\n", + " name=\"Weather Agent\", \n", + " role=\"Get weather forecasts and provide weather analysis\", \n", + " model=OpenAIChat(id=\"gpt-4o-mini\")\n", + " )\n", + "\n", + " # Create a team with coordination mode\n", + " # The \"coordinate\" mode allows agents to work together and share information\n", + " team = Team(\n", + " name=\"News and Weather Team\", \n", + " mode=\"coordinate\", # Agents will coordinate their responses\n", + " members=[news_agent, weather_agent]\n", + " )\n", + "\n", + " # Run a task that requires team coordination\n", + " # The team will automatically determine which agent(s) should respond\n", + " response = team.run(\"What is the weather in Tokyo?\")\n", + " \n", + " print(\"\\nTeam Response:\")\n", + " print(\"-\" * 60)\n", + " print(f\"{response.content}\")\n", + " print(\"-\" * 60)\n", + "\n", + " agentops.end_trace(tracer, end_state=\"Success\")\n", + "\n", + " except Exception as e:\n", + " print(f\"An error occurred: {e}\")\n", + " agentops.end_trace(tracer, end_state=\"Error\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ca13c9b0", + "metadata": {}, + "outputs": [], + "source": [ + "demonstrate_basic_agents()" + ] + } + ], + "metadata": { + "jupytext": { + "cell_metadata_filter": "-all", + "main_language": "python", + "notebook_metadata_filter": "-all" + }, + "kernelspec": { + "display_name": "agentops (3.11.11)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/agno/agno_basic_agents.py b/examples/agno/agno_basic_agents.py new file mode 100644 index 000000000..56e042768 --- /dev/null +++ b/examples/agno/agno_basic_agents.py @@ -0,0 +1,92 @@ +""" +# Basic Agents and Teams with Agno + +This example demonstrates the fundamentals of creating AI agents and organizing them into collaborative teams using the Agno framework. + +## Overview + +In this example, you'll learn how to: +- **Create specialized AI agents** with specific roles and expertise +- **Organize agents into teams** for collaborative problem-solving +- **Use coordination modes** for effective agent communication +- **Monitor agent interactions** with AgentOps integration + +## Key Concepts + +### Agents +Individual AI entities with specific roles and capabilities. Each agent can be assigned a particular area of expertise, making them specialists in their domain. + +### Teams +Collections of agents that work together to solve complex tasks. Teams can coordinate their responses, share information, and delegate tasks based on each agent's expertise. + +### Coordination Modes +Different strategies for how agents within a team interact and collaborate. The "coordinate" mode enables intelligent task routing and information sharing. +""" +import os +from dotenv import load_dotenv +import agentops +from agno.agent import Agent +from agno.team import Team +from agno.models.openai import OpenAIChat + +load_dotenv() + +os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "your_openai_api_key_here") +os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY", "your_agentops_api_key_here") + +agentops.init(auto_start_session=False, tags=["agno-example", "basics", "agents-and-teams"]) + + +def demonstrate_basic_agents(): + """ + Demonstrate basic agent creation and team coordination. + + This function shows how to: + 1. Create specialized agents with specific roles + 2. Organize agents into a team + 3. Use the team to solve tasks that require multiple perspectives + """ + tracer = agentops.start_trace(trace_name="Agno Basic Agents and Teams Demonstration") + + try: + # Create individual agents with specific roles + # Each agent has a name and a role that defines its expertise + + # News Agent: Specializes in gathering and analyzing news information + news_agent = Agent( + name="News Agent", role="Get the latest news and provide news analysis", model=OpenAIChat(id="gpt-4o-mini") + ) + + # Weather Agent: Specializes in weather forecasting and analysis + weather_agent = Agent( + name="Weather Agent", + role="Get weather forecasts and provide weather analysis", + model=OpenAIChat(id="gpt-4o-mini"), + ) + + # Create a team with coordination mode + # The "coordinate" mode allows agents to work together and share information + team = Team( + name="News and Weather Team", + mode="coordinate", # Agents will coordinate their responses + members=[news_agent, weather_agent], + ) + + # Run a task that requires team coordination + # The team will automatically determine which agent(s) should respond + response = team.run("What is the weather in Tokyo?") + + print("\nTeam Response:") + print("-" * 60) + print(f"{response.content}") + print("-" * 60) + + agentops.end_trace(tracer, end_state="Success") + + except Exception as e: + print(f"An error occurred: {e}") + agentops.end_trace(tracer, end_state="Error") + + +if __name__ == "__main__": + demonstrate_basic_agents() diff --git a/examples/agno/agno_research_team.ipynb b/examples/agno/agno_research_team.ipynb new file mode 100644 index 000000000..1f14a77fb --- /dev/null +++ b/examples/agno/agno_research_team.ipynb @@ -0,0 +1,303 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "e78c48e2", + "metadata": {}, + "source": [ + "\n", + "Collaborative Research Team with Agno\n", + "--------------------------------------\n", + "\n", + "This example demonstrates how to create a sophisticated research team with multiple specialized agents,\n", + "each equipped with different tools and expertise. The team collaborates to research topics from\n", + "multiple perspectives, providing comprehensive insights.\n", + "\n", + "**Overview:**\n", + "\n", + "This example creates a research team consisting of four specialized agents:\n", + "\n", + "1. **Reddit Researcher**\n", + " - Focus: Community discussions and user experiences\n", + " - Tools: Google Search (to find Reddit discussions)\n", + " - Expertise: Analyzing user opinions, practical advice, and real-world experiences\n", + " - Role: Provides insights from community perspectives\n", + "\n", + "2. **HackerNews Researcher**\n", + " - Focus: Technical discussions and industry trends\n", + " - Tools: HackerNews API\n", + " - Expertise: Technical analysis and industry insights\n", + " - Role: Provides technical and startup ecosystem perspectives\n", + "\n", + "3. **Academic Paper Researcher**\n", + " - Focus: Scholarly research and evidence-based findings\n", + " - Tools: Google Search + Arxiv API\n", + " - Expertise: Academic literature and research methodology\n", + " - Role: Provides evidence-based academic insights\n", + "\n", + "4. **Twitter Researcher**\n", + " - Focus: Real-time trends and public sentiment\n", + " - Tools: DuckDuckGo Search\n", + " - Expertise: Current events and public opinion\n", + " - Role: Provides real-time social media insights\n", + "\n", + "**Team Collaboration:**\n", + "\n", + "- Mode: Collaborative discussion\n", + "- Coordination: Team uses GPT-4 for discussion management\n", + "- Process: \n", + " 1. Each agent researches independently using their tools\n", + " 2. Agents share findings and discuss implications\n", + " 3. Team works towards consensus through structured discussion\n", + " 4. Discussion continues until comprehensive understanding is reached\n", + "\n", + "**Features Demonstrated:**\n", + "\n", + "- Creating specialized agents with specific research tools\n", + "- Building collaborative teams that discuss and reach consensus\n", + "- Using various research tools (Google Search, HackerNews, Arxiv, DuckDuckGo)\n", + "- Enabling real-time streaming of agent discussions\n", + "- Tracking agent interactions with AgentOps\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c9f7fb8d", + "metadata": {}, + "outputs": [], + "source": [ + "# Install the required dependencies:\n", + "%pip install agentops\n", + "%pip install agno\n", + "%pip install python-dotenv" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "08978603", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from textwrap import dedent\n", + "\n", + "import agentops\n", + "from agno.agent import Agent\n", + "from agno.team import Team\n", + "from agno.tools.googlesearch import GoogleSearchTools\n", + "from agno.tools.hackernews import HackerNewsTools\n", + "from agno.tools.arxiv import ArxivTools\n", + "from agno.tools.duckduckgo import DuckDuckGoTools\n", + "from agno.models.openai import OpenAIChat\n", + "from dotenv import load_dotenv" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "143be722", + "metadata": {}, + "outputs": [], + "source": [ + "# Load environment variables\n", + "load_dotenv()\n", + "os.environ[\"OPENAI_API_KEY\"] = os.getenv(\"OPENAI_API_KEY\", \"your_openai_api_key_here\")\n", + "os.environ[\"AGENTOPS_API_KEY\"] = os.getenv(\"AGENTOPS_API_KEY\", \"your_agentops_api_key_here\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2e071b24", + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize AgentOps for monitoring and analytics\n", + "agentops.init(auto_start_session=False, tags=[\"agno-example\", \"research-team\"])" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "8eb6fbe2", + "metadata": {}, + "outputs": [], + "source": [ + "def demonstrate_research_team():\n", + " \"\"\"\n", + " Demonstrate a collaborative research team with multiple specialized agents.\n", + "\n", + " This function creates a team of researchers, each with:\n", + " - Specific expertise and research focus\n", + " - Specialized tools for their domain\n", + " - Custom instructions for their research approach\n", + "\n", + " The team collaborates to provide comprehensive research insights.\n", + " \"\"\"\n", + " tracer = agentops.start_trace(trace_name=\"Agno Research Team Demonstration\")\n", + "\n", + " try:\n", + " print(\"\\n1. Creating specialized research agents...\")\n", + "\n", + " # Reddit Researcher: Focuses on community discussions and user experiences\n", + " reddit_researcher = Agent(\n", + " name=\"Reddit Researcher\",\n", + " role=\"Research a topic on Reddit\",\n", + " model=OpenAIChat(id=\"gpt-4o\"), \n", + " tools=[GoogleSearchTools()], \n", + " add_name_to_instructions=True, \n", + " instructions=dedent(\n", + " \"\"\"\n", + " You are a Reddit researcher specializing in community insights.\n", + " You will be given a topic to research on Reddit.\n", + " Your tasks:\n", + " - Find the most relevant and popular Reddit posts\n", + " - Identify common opinions and experiences from users\n", + " - Highlight both positive and negative perspectives\n", + " - Focus on practical advice and real-world experiences\n", + " \"\"\"\n", + " ),\n", + " )\n", + "\n", + " # HackerNews Researcher: Focuses on technical discussions and industry trends\n", + " hackernews_researcher = Agent(\n", + " name=\"HackerNews Researcher\",\n", + " model=OpenAIChat(\"gpt-4o\"),\n", + " role=\"Research a topic on HackerNews.\",\n", + " tools=[HackerNewsTools()],\n", + " add_name_to_instructions=True,\n", + " instructions=dedent(\n", + " \"\"\"\n", + " You are a HackerNews researcher specializing in technical insights.\n", + " You will be given a topic to research on HackerNews.\n", + " Your tasks:\n", + " - Find the most relevant technical discussions\n", + " - Identify industry trends and expert opinions\n", + " - Focus on technical depth and innovation\n", + " - Highlight startup and technology perspectives\n", + " \"\"\"\n", + " ),\n", + " )\n", + "\n", + " # Academic Paper Researcher: Focuses on scholarly research and evidence\n", + " academic_paper_researcher = Agent(\n", + " name=\"Academic Paper Researcher\",\n", + " model=OpenAIChat(\"gpt-4o\"),\n", + " role=\"Research academic papers and scholarly content\",\n", + " tools=[GoogleSearchTools(), ArxivTools()], \n", + " add_name_to_instructions=True,\n", + " instructions=dedent(\n", + " \"\"\"\n", + " You are an academic paper researcher specializing in scholarly content.\n", + " You will be given a topic to research in academic literature.\n", + " Your tasks:\n", + " - Find relevant scholarly articles, papers, and academic discussions\n", + " - Focus on peer-reviewed content and citations from reputable sources\n", + " - Provide brief summaries of key findings and methodologies\n", + " - Highlight evidence-based conclusions and research gaps\n", + " \"\"\"\n", + " ),\n", + " )\n", + "\n", + " # Twitter Researcher: Focuses on real-time trends and public sentiment\n", + " twitter_researcher = Agent(\n", + " name=\"Twitter Researcher\",\n", + " model=OpenAIChat(\"gpt-4o\"),\n", + " role=\"Research trending discussions and real-time updates\",\n", + " tools=[DuckDuckGoTools()],\n", + " add_name_to_instructions=True,\n", + " instructions=dedent(\n", + " \"\"\"\n", + " You are a Twitter/X researcher specializing in real-time insights.\n", + " You will be given a topic to research on Twitter/X.\n", + " Your tasks:\n", + " - Find trending discussions and influential voices\n", + " - Track real-time updates and breaking news\n", + " - Focus on verified accounts and credible sources\n", + " - Identify relevant hashtags and ongoing conversations\n", + " - Capture public sentiment and viral content\n", + " \"\"\"\n", + " ),\n", + " )\n", + "\n", + " # Create collaborative team with advanced features\n", + " agent_team = Team(\n", + " name=\"Discussion Team\",\n", + " mode=\"collaborate\",\n", + " model=OpenAIChat(\"gpt-4o\"),\n", + " members=[\n", + " reddit_researcher,\n", + " hackernews_researcher,\n", + " academic_paper_researcher,\n", + " twitter_researcher,\n", + " ],\n", + " instructions=[\n", + " \"You are a discussion master coordinating a research team.\",\n", + " \"Facilitate productive discussion between all researchers.\",\n", + " \"Ensure each researcher contributes their unique perspective.\",\n", + " \"Guide the team towards a comprehensive understanding of the topic.\",\n", + " \"You have to stop the discussion when you think the team has reached a consensus.\",\n", + " ],\n", + " success_criteria=\"The team has reached a consensus with insights from all perspectives.\",\n", + " enable_agentic_context=True,\n", + " add_context=True,\n", + " show_tool_calls=True,\n", + " markdown=True,\n", + " debug_mode=True,\n", + " show_members_responses=True,\n", + " )\n", + "\n", + " # Stream the team discussion in real-time\n", + " agent_team.print_response(\n", + " message=\"Start the discussion on the topic: 'What is the best way to learn to code?'\",\n", + " stream=True,\n", + " stream_intermediate_steps=True,\n", + " )\n", + "\n", + " agentops.end_trace(tracer, end_state=\"Success\")\n", + "\n", + " except Exception:\n", + " agentops.end_trace(tracer, end_state=\"Error\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "754f75be", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "demonstrate_research_team()" + ] + } + ], + "metadata": { + "jupytext": { + "cell_metadata_filter": "-all", + "main_language": "python", + "notebook_metadata_filter": "-all" + }, + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/agno/agno_research_team.py b/examples/agno/agno_research_team.py new file mode 100644 index 000000000..073fcf50a --- /dev/null +++ b/examples/agno/agno_research_team.py @@ -0,0 +1,208 @@ +""" +Collaborative Research Team with Agno + +This example demonstrates how to create a sophisticated research team with multiple specialized agents, +each equipped with different tools and expertise. The team collaborates to research topics from +multiple perspectives, providing comprehensive insights. + +Overview: +--------- +This example creates a research team consisting of four specialized agents: + +1. Reddit Researcher + - Focus: Community discussions and user experiences + - Tools: Google Search (to find Reddit discussions) + - Expertise: Analyzing user opinions, practical advice, and real-world experiences + - Role: Provides insights from community perspectives + +2. HackerNews Researcher + - Focus: Technical discussions and industry trends + - Tools: HackerNews API + - Expertise: Technical analysis and industry insights + - Role: Provides technical and startup ecosystem perspectives + +3. Academic Paper Researcher + - Focus: Scholarly research and evidence-based findings + - Tools: Google Search + Arxiv API + - Expertise: Academic literature and research methodology + - Role: Provides evidence-based academic insights + +4. Twitter Researcher + - Focus: Real-time trends and public sentiment + - Tools: DuckDuckGo Search + - Expertise: Current events and public opinion + - Role: Provides real-time social media insights + +Team Collaboration: +------------------ +- Mode: Collaborative discussion +- Coordination: Team uses GPT-4 for discussion management +- Process: + 1. Each agent researches independently using their tools + 2. Agents share findings and discuss implications + 3. Team works towards consensus through structured discussion + 4. Discussion continues until comprehensive understanding is reached + +Features Demonstrated: +--------------------- +- Creating specialized agents with specific research tools +- Building collaborative teams that discuss and reach consensus +- Using various research tools (Google Search, HackerNews, Arxiv, DuckDuckGo) +- Enabling real-time streaming of agent discussions +- Tracking agent interactions with AgentOps +""" + +from textwrap import dedent +from agno.agent import Agent +from agno.team import Team +from agno.tools.googlesearch import GoogleSearchTools +from agno.tools.hackernews import HackerNewsTools +from agno.tools.arxiv import ArxivTools +from agno.tools.duckduckgo import DuckDuckGoTools +from agno.models.openai import OpenAIChat +import agentops +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + +# Initialize AgentOps for monitoring and analytics +agentops.init(auto_start_session=False, tags=["agno-example", "research-team"]) + + +def demonstrate_research_team(): + """ + Demonstrate a collaborative research team with multiple specialized agents. + + This function creates a team of researchers, each with: + - Specific expertise and research focus + - Specialized tools for their domain + - Custom instructions for their research approach + + The team collaborates to provide comprehensive research insights. + """ + tracer = agentops.start_trace(trace_name="Agno Research Team Demonstration") + + try: + # Reddit Researcher: Focuses on community discussions and user experiences + reddit_researcher = Agent( + name="Reddit Researcher", + role="Research a topic on Reddit", + model=OpenAIChat(id="gpt-4o"), + tools=[GoogleSearchTools()], + add_name_to_instructions=True, + instructions=dedent( + """ + You are a Reddit researcher specializing in community insights. + You will be given a topic to research on Reddit. + Your tasks: + - Find the most relevant and popular Reddit posts + - Identify common opinions and experiences from users + - Highlight both positive and negative perspectives + - Focus on practical advice and real-world experiences + """ + ), + ) + + # HackerNews Researcher: Focuses on technical discussions and industry trends + hackernews_researcher = Agent( + name="HackerNews Researcher", + model=OpenAIChat("gpt-4o"), + role="Research a topic on HackerNews.", + tools=[HackerNewsTools()], + add_name_to_instructions=True, + instructions=dedent( + """ + You are a HackerNews researcher specializing in technical insights. + You will be given a topic to research on HackerNews. + Your tasks: + - Find the most relevant technical discussions + - Identify industry trends and expert opinions + - Focus on technical depth and innovation + - Highlight startup and technology perspectives + """ + ), + ) + + # Academic Paper Researcher: Focuses on scholarly research and evidence + academic_paper_researcher = Agent( + name="Academic Paper Researcher", + model=OpenAIChat("gpt-4o"), + role="Research academic papers and scholarly content", + tools=[GoogleSearchTools(), ArxivTools()], + add_name_to_instructions=True, + instructions=dedent( + """ + You are an academic paper researcher specializing in scholarly content. + You will be given a topic to research in academic literature. + Your tasks: + - Find relevant scholarly articles, papers, and academic discussions + - Focus on peer-reviewed content and citations from reputable sources + - Provide brief summaries of key findings and methodologies + - Highlight evidence-based conclusions and research gaps + """ + ), + ) + + # Twitter Researcher: Focuses on real-time trends and public sentiment + twitter_researcher = Agent( + name="Twitter Researcher", + model=OpenAIChat("gpt-4o"), + role="Research trending discussions and real-time updates", + tools=[DuckDuckGoTools()], + add_name_to_instructions=True, + instructions=dedent( + """ + You are a Twitter/X researcher specializing in real-time insights. + You will be given a topic to research on Twitter/X. + Your tasks: + - Find trending discussions and influential voices + - Track real-time updates and breaking news + - Focus on verified accounts and credible sources + - Identify relevant hashtags and ongoing conversations + - Capture public sentiment and viral content + """ + ), + ) + + # Create collaborative team with advanced features + agent_team = Team( + name="Discussion Team", + mode="collaborate", + model=OpenAIChat("gpt-4o"), + members=[ + reddit_researcher, + hackernews_researcher, + academic_paper_researcher, + twitter_researcher, + ], + instructions=[ + "You are a discussion master coordinating a research team.", + "Facilitate productive discussion between all researchers.", + "Ensure each researcher contributes their unique perspective.", + "Guide the team towards a comprehensive understanding of the topic.", + "You have to stop the discussion when you think the team has reached a consensus.", + ], + success_criteria="The team has reached a consensus with insights from all perspectives.", + enable_agentic_context=True, + add_context=True, + show_tool_calls=True, + markdown=True, + debug_mode=True, + show_members_responses=True, + ) + + # Stream the team discussion in real-time + agent_team.print_response( + message="Start the discussion on the topic: 'What is the best way to learn to code?'", + stream=True, + stream_intermediate_steps=True, + ) + + agentops.end_trace(tracer, end_state="Success") + + except Exception: + agentops.end_trace(tracer, end_state="Error") + + +demonstrate_research_team() diff --git a/examples/agno/agno_tool_integrations.ipynb b/examples/agno/agno_tool_integrations.ipynb new file mode 100644 index 000000000..1d3ac06e6 --- /dev/null +++ b/examples/agno/agno_tool_integrations.ipynb @@ -0,0 +1,173 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "813b80b7", + "metadata": {}, + "source": [ + "\n", + "# Tool Integration with RAG (Retrieval-Augmented Generation) in Agno\n", + "\n", + "This example demonstrates how to enhance Agno agents with RAG capabilities, allowing them to access and reason over external knowledge bases for more accurate and source-backed responses.\n", + "\n", + "**Overview**\n", + "\n", + "This example shows how to integrate RAG with Agno agents where we:\n", + "\n", + "1. **Set up a knowledge base** with documents, URLs, and other external sources\n", + "2. **Configure vector databases** (like Pinecone, Weaviate, or ChromaDB) for efficient semantic search\n", + "3. **Implement retrieval** using embeddings and reranking for accurate information access\n", + "4. **Create RAG-enabled agents** that can search, retrieve, and reason over the knowledge base\n", + "\n", + "By using RAG, agents can provide responses backed by external sources rather than relying solely on their training data, significantly improving accuracy and verifiability of their outputs.\n", + "\n", + "RAG enables agents to access and reason over large knowledge bases,\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dfa7ae6b", + "metadata": {}, + "outputs": [], + "source": [ + "# Install the required dependencies:\n", + "%pip install agentops\n", + "%pip install agno\n", + "%pip install python-dotenv" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "208656e8", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from dotenv import load_dotenv\n", + "\n", + "import agentops\n", + "from agno.agent import Agent\n", + "from agno.models.openai import OpenAIChat\n", + "\n", + "# Knowledge & RAG components\n", + "from agno.knowledge.url import UrlKnowledge\n", + "from agno.vectordb.lancedb import LanceDb\n", + "from agno.vectordb.search import SearchType\n", + "from agno.embedder.cohere import CohereEmbedder\n", + "from agno.reranker.cohere import CohereReranker\n", + "from agno.tools.reasoning import ReasoningTools" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "50eedd68", + "metadata": {}, + "outputs": [], + "source": [ + "# Load environment variables\n", + "load_dotenv()\n", + "os.environ[\"OPENAI_API_KEY\"] = os.getenv(\"OPENAI_API_KEY\", \"your_openai_api_key_here\")\n", + "os.environ[\"AGENTOPS_API_KEY\"] = os.getenv(\"AGENTOPS_API_KEY\", \"your_agentops_api_key_here\")\n", + "os.environ[\"COHERE_API_KEY\"] = os.getenv(\"COHERE_API_KEY\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "51235ba1", + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize AgentOps for monitoring\n", + "agentops.init(auto_start_session=False, tags=[\"agno-example\", \"tool-integrations\"])" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "id": "24da7b0b", + "metadata": {}, + "outputs": [], + "source": [ + "def demonstrate_tool_integration():\n", + " \"\"\"\n", + " Demonstrate advanced tool integration with RAG and knowledge bases.\n", + "\n", + " This function shows how to:\n", + " 1. Create a knowledge base from external sources\n", + " 2. Set up a vector database with embeddings\n", + " 3. Configure an agent with RAG capabilities\n", + " 4. Enable reasoning tools for complex problem-solving\n", + " \"\"\"\n", + " tracer = agentops.start_trace(trace_name=\"Agno Tool Integration Demonstration\")\n", + " try:\n", + " # Create knowledge base from documentation URLs\n", + " # This loads content from the specified URLs and prepares it for RAG\n", + " knowledge_base = UrlKnowledge(\n", + " urls=[\"https://docs.agno.com/introduction/agents.md\"],\n", + " vector_db=LanceDb(\n", + " uri=\"tmp/lancedb\",\n", + " table_name=\"agno_docs\",\n", + " search_type=SearchType.hybrid,\n", + " embedder=CohereEmbedder(\n", + " id=\"embed-v4.0\",\n", + " \n", + " ),\n", + " reranker=CohereReranker(\n", + " model=\"rerank-v3.5\",\n", + " \n", + " ),\n", + " ),\n", + " )\n", + "\n", + " # Create an intelligent agent with RAG capabilities\n", + " agent = Agent(\n", + " model=OpenAIChat(id=\"gpt-4o-mini\"),\n", + " knowledge=knowledge_base,\n", + " search_knowledge=True,\n", + " tools=[ReasoningTools(add_instructions=True)],\n", + " instructions=[\n", + " \"Include sources in your response.\",\n", + " \"Always search your knowledge before answering the question.\",\n", + " \"Only include the output in your response. No other text.\",\n", + " ],\n", + " )\n", + "\n", + " # Print response with full reasoning process visible\n", + " agent.print_response(\n", + " \"What are Agents?\",\n", + " show_full_reasoning=True,\n", + " )\n", + " agentops.end_trace(tracer, end_state=\"Success\")\n", + " except Exception:\n", + " agentops.end_trace(tracer, end_state=\"Error\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3cd3e2dc", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "demonstrate_tool_integration()" + ] + } + ], + "metadata": { + "jupytext": { + "cell_metadata_filter": "-all", + "main_language": "python", + "notebook_metadata_filter": "-all" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/agno/agno_tool_integrations.py b/examples/agno/agno_tool_integrations.py new file mode 100644 index 000000000..6cc0d0b4a --- /dev/null +++ b/examples/agno/agno_tool_integrations.py @@ -0,0 +1,96 @@ +""" +# Tool Integration with RAG (Retrieval-Augmented Generation) in Agno + +This example demonstrates how to enhance Agno agents with RAG capabilities, allowing them to access and reason over external knowledge bases for more accurate and source-backed responses. + +## Overview +This example shows how to integrate RAG with Agno agents where we: + +1. **Set up a knowledge base** with documents, URLs, and other external sources +2. **Configure vector databases** (like Pinecone, Weaviate, or ChromaDB) for efficient semantic search +3. **Implement retrieval** using embeddings and reranking for accurate information access +4. **Create RAG-enabled agents** that can search, retrieve, and reason over the knowledge base + +By using RAG, agents can provide responses backed by external sources rather than relying solely on their training data, significantly improving accuracy and verifiability of their outputs. + +RAG enables agents to access and reason over large knowledge bases, +providing accurate, source-backed responses instead of relying solely on training data. +""" + +import os +from agno.agent import Agent +from agno.models.openai import OpenAIChat +import agentops +from dotenv import load_dotenv + +# Knowledge and RAG components +from agno.knowledge.url import UrlKnowledge +from agno.vectordb.lancedb import LanceDb +from agno.vectordb.search import SearchType +from agno.embedder.cohere import CohereEmbedder +from agno.reranker.cohere import CohereReranker +from agno.tools.reasoning import ReasoningTools + +# Load environment variables +load_dotenv() + +# Initialize AgentOps for monitoring +agentops.init(auto_start_session=False, tags=["agno-example", "tool-integrations"]) + +# API keys and configuration +os.environ["COHERE_API_KEY"] = os.getenv("COHERE_API_KEY") + + +def demonstrate_tool_integration(): + """ + Demonstrate advanced tool integration with RAG and knowledge bases. + + This function shows how to: + 1. Create a knowledge base from external sources + 2. Set up a vector database with embeddings + 3. Configure an agent with RAG capabilities + 4. Enable reasoning tools for complex problem-solving + """ + tracer = agentops.start_trace(trace_name="Agno Tool Integration Demonstration") + try: + # Create knowledge base from documentation URLs + # This loads content from the specified URLs and prepares it for RAG + knowledge_base = UrlKnowledge( + urls=["https://docs.agno.com/introduction/agents.md"], + vector_db=LanceDb( + uri="tmp/lancedb", + table_name="agno_docs", + search_type=SearchType.hybrid, + embedder=CohereEmbedder( + id="embed-v4.0", + ), + reranker=CohereReranker( + model="rerank-v3.5", + ), + ), + ) + + # Create an intelligent agent with RAG capabilities + agent = Agent( + model=OpenAIChat(id="gpt-4o-mini"), + knowledge=knowledge_base, + search_knowledge=True, + tools=[ReasoningTools(add_instructions=True)], + instructions=[ + "Include sources in your response.", + "Always search your knowledge before answering the question.", + "Only include the output in your response. No other text.", + ], + ) + + # Print response with full reasoning process visible + agent.print_response( + "What are Agents?", + show_full_reasoning=True, + ) + agentops.end_trace(tracer, end_state="Success") + except Exception: + agentops.end_trace(tracer, end_state="Error") + + +demonstrate_tool_integration() diff --git a/examples/agno/agno_workflow_setup.ipynb b/examples/agno/agno_workflow_setup.ipynb new file mode 100644 index 000000000..107792d46 --- /dev/null +++ b/examples/agno/agno_workflow_setup.ipynb @@ -0,0 +1,218 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "2ac8b4d4", + "metadata": {}, + "source": [ + "\n", + "# Workflow Setup with Caching in Agno\n", + "\n", + "This example demonstrates how to create efficient, stateful workflows in Agno that orchestrate complex agent interactions while maintaining performance through caching and state management.\n", + "\n", + "## Overview\n", + "This example shows how to build reusable agent workflows where we:\n", + "\n", + "1. **Design workflow architecture** with custom logic and agent orchestration\n", + "2. **Implement caching mechanisms** to store and reuse expensive computations\n", + "3. **Manage session state** to maintain context across multiple interactions\n", + "4. **Set up response streaming** for real-time output handling\n", + "\n", + "By using workflows, you can create sophisticated agent pipelines that are both performant and maintainable, with built-in optimizations for repeated operations and long-running sessions.\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "40bf672a", + "metadata": {}, + "outputs": [], + "source": [ + "# Install the required dependencies:\n", + "%pip install agentops\n", + "%pip install agno\n", + "%pip install python-dotenv" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ee482533", + "metadata": { + "lines_to_next_cell": 2 + }, + "outputs": [], + "source": [ + "import os\n", + "from dotenv import load_dotenv\n", + "\n", + "import agentops\n", + "from agno.agent import Agent, RunResponse\n", + "from agno.workflow import Workflow\n", + "from agno.utils.pprint import pprint_run_response\n", + "from agno.models.openai import OpenAIChat\n", + "from agno.utils.log import logger\n", + "from typing import Iterator" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "eddbb872", + "metadata": {}, + "outputs": [], + "source": [ + "# Load environment variables\n", + "load_dotenv()\n", + "os.environ[\"OPENAI_API_KEY\"] = os.getenv(\"OPENAI_API_KEY\", \"your_openai_api_key_here\")\n", + "os.environ[\"AGENTOPS_API_KEY\"] = os.getenv(\"AGENTOPS_API_KEY\", \"your_agentops_api_key_here\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6f234791", + "metadata": {}, + "outputs": [], + "source": [ + "agentops.init(auto_start_session=False, tags=[\"agno-example\", \"workflow-setup\"])" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "ff36679e", + "metadata": {}, + "outputs": [], + "source": [ + "class CacheWorkflow(Workflow):\n", + " \"\"\"\n", + " A workflow that demonstrates intelligent caching capabilities.\n", + "\n", + " This workflow:\n", + " - Caches agent responses to avoid redundant API calls\n", + " - Maintains session state across multiple invocations\n", + " - Provides instant responses for repeated queries\n", + " - Reduces costs and improves performance\n", + "\n", + " Use cases:\n", + " - FAQ systems where questions repeat frequently\n", + " - Development/testing to avoid repeated API calls\n", + " - Systems with predictable query patterns\n", + " \"\"\"\n", + "\n", + " # Workflow metadata (descriptive, not functional)\n", + " description: str = \"A workflow that caches previous outputs for efficiency\"\n", + "\n", + " # Initialize agents as workflow attributes\n", + " # This agent will be used to generate responses when cache misses occur\n", + " agent = Agent(model=OpenAIChat(id=\"gpt-4o-mini\"), description=\"General purpose agent for generating responses\")\n", + "\n", + " def run(self, message: str) -> Iterator[RunResponse]:\n", + " \"\"\"\n", + " Execute the workflow with caching logic.\n", + "\n", + " This method:\n", + " 1. Checks if the response is already cached\n", + " 2. Returns cached response immediately if found\n", + " 3. Generates new response if not cached\n", + " 4. Caches the new response for future use\n", + "\n", + " Args:\n", + " message: The input query to process\n", + "\n", + " Yields:\n", + " RunResponse: Streamed response chunks\n", + " \"\"\"\n", + " logger.info(f\"Checking cache for '{message}'\")\n", + "\n", + " if self.session_state.get(message):\n", + " logger.info(f\"Cache hit for '{message}'\")\n", + " # Return cached response immediately (no API call needed)\n", + " yield RunResponse(run_id=self.run_id, content=self.session_state.get(message))\n", + " return\n", + "\n", + " logger.info(f\"Cache miss for '{message}'\")\n", + "\n", + " yield from self.agent.run(message, stream=True)\n", + "\n", + " self.session_state[message] = self.agent.run_response.content\n", + " logger.info(\"Cached response for future use\")" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "b1e9e06c", + "metadata": {}, + "outputs": [], + "source": [ + "def demonstrate_workflows():\n", + " \"\"\"\n", + " Demonstrate workflow capabilities with caching.\n", + "\n", + " This function shows:\n", + " - How to create and use custom workflows\n", + " - The performance benefits of caching\n", + " - Session state persistence\n", + " - Response streaming\n", + " \"\"\"\n", + "\n", + " tracer = agentops.start_trace(trace_name=\"Agno Workflow Setup Demonstration\")\n", + " try:\n", + " workflow = CacheWorkflow()\n", + "\n", + " response: Iterator[RunResponse] = workflow.run(message=\"Tell me a joke.\")\n", + "\n", + " pprint_run_response(response, markdown=True, show_time=True)\n", + "\n", + " response: Iterator[RunResponse] = workflow.run(message=\"Tell me a joke.\")\n", + "\n", + " pprint_run_response(response, markdown=True, show_time=True)\n", + "\n", + " agentops.end_trace(tracer, end_state=\"Success\")\n", + "\n", + " except Exception:\n", + " agentops.end_trace(tracer, end_state=\"Error\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d09d8f70", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "demonstrate_workflows()" + ] + } + ], + "metadata": { + "jupytext": { + "cell_metadata_filter": "-all", + "main_language": "python", + "notebook_metadata_filter": "-all" + }, + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/agno/agno_workflow_setup.py b/examples/agno/agno_workflow_setup.py new file mode 100644 index 000000000..d3aa0c9ea --- /dev/null +++ b/examples/agno/agno_workflow_setup.py @@ -0,0 +1,117 @@ +""" +# Workflow Setup with Caching in Agno + +This example demonstrates how to create efficient, stateful workflows in Agno that orchestrate complex agent interactions while maintaining performance through caching and state management. + +## Overview +This example shows how to build reusable agent workflows where we: + +1. **Design workflow architecture** with custom logic and agent orchestration +2. **Implement caching mechanisms** to store and reuse expensive computations +3. **Manage session state** to maintain context across multiple interactions +4. **Set up response streaming** for real-time output handling + +By using workflows, you can create sophisticated agent pipelines that are both performant and maintainable, with built-in optimizations for repeated operations and long-running sessions. + +""" + +from agno.agent import Agent, RunResponse +import asyncio +import agentops +from dotenv import load_dotenv +from agno.workflow import Workflow +from agno.utils.pprint import pprint_run_response +from agno.models.openai import OpenAIChat +from agno.utils.log import logger +from typing import Iterator + + +load_dotenv() +agentops.init(auto_start_session=False, tags=["agno-example", "workflow-setup"]) + + +class CacheWorkflow(Workflow): + """ + A workflow that demonstrates intelligent caching capabilities. + + This workflow: + - Caches agent responses to avoid redundant API calls + - Maintains session state across multiple invocations + - Provides instant responses for repeated queries + - Reduces costs and improves performance + + Use cases: + - FAQ systems where questions repeat frequently + - Development/testing to avoid repeated API calls + - Systems with predictable query patterns + """ + + # Workflow metadata (descriptive, not functional) + description: str = "A workflow that caches previous outputs for efficiency" + + # Initialize agents as workflow attributes + # This agent will be used to generate responses when cache misses occur + agent = Agent(model=OpenAIChat(id="gpt-4o-mini"), description="General purpose agent for generating responses") + + def run(self, message: str) -> Iterator[RunResponse]: + """ + Execute the workflow with caching logic. + + This method: + 1. Checks if the response is already cached + 2. Returns cached response immediately if found + 3. Generates new response if not cached + 4. Caches the new response for future use + + Args: + message: The input query to process + + Yields: + RunResponse: Streamed response chunks + """ + logger.info(f"Checking cache for '{message}'") + + if self.session_state.get(message): + logger.info(f"Cache hit for '{message}'") + # Return cached response immediately (no API call needed) + yield RunResponse(run_id=self.run_id, content=self.session_state.get(message)) + return + + logger.info(f"Cache miss for '{message}'") + + yield from self.agent.run(message, stream=True) + + self.session_state[message] = self.agent.run_response.content + logger.info("Cached response for future use") + + +def demonstrate_workflows(): + """ + Demonstrate workflow capabilities with caching. + + This function shows: + - How to create and use custom workflows + - The performance benefits of caching + - Session state persistence + - Response streaming + """ + + tracer = agentops.start_trace(trace_name="Agno Workflow Setup Demonstration") + try: + workflow = CacheWorkflow() + + response: Iterator[RunResponse] = workflow.run(message="Tell me a joke.") + + pprint_run_response(response, markdown=True, show_time=True) + + response: Iterator[RunResponse] = workflow.run(message="Tell me a joke.") + + pprint_run_response(response, markdown=True, show_time=True) + + agentops.end_trace(tracer, end_state="Success") + + except Exception: + agentops.end_trace(tracer, end_state="Error") + + +asyncio.run(demonstrate_workflows())