|
| 1 | +import copy |
| 2 | +from collections import defaultdict |
| 3 | +from time import time |
| 4 | +from typing import Dict, Generic, List, Optional, Sequence, TypeVar |
| 5 | + |
| 6 | +from opentelemetry.sdk.trace import ReadableSpan |
| 7 | +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult |
| 8 | + |
| 9 | +from uipath.eval._helpers import auto_discover_entrypoint |
| 10 | + |
| 11 | +from .._runtime._contracts import ( |
| 12 | + UiPathBaseRuntime, |
| 13 | + UiPathRuntimeContext, |
| 14 | + UiPathRuntimeFactory, |
| 15 | + UiPathRuntimeResult, |
| 16 | + UiPathRuntimeStatus, |
| 17 | +) |
| 18 | +from .._utils._eval_set import EvalHelpers |
| 19 | +from ._models import EvaluationItem |
| 20 | +from ._models._agent_execution_output import UiPathEvalRunExecutionOutput |
| 21 | + |
| 22 | +T = TypeVar("T", bound=UiPathBaseRuntime) |
| 23 | +C = TypeVar("C", bound=UiPathRuntimeContext) |
| 24 | + |
| 25 | + |
| 26 | +class ExecutionSpanExporter(SpanExporter): |
| 27 | + """Custom exporter that stores spans grouped by execution ids.""" |
| 28 | + |
| 29 | + def __init__(self): |
| 30 | + # { execution_id -> list of spans } |
| 31 | + self._spans: Dict[str, List[ReadableSpan]] = defaultdict(list) |
| 32 | + |
| 33 | + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: |
| 34 | + for span in spans: |
| 35 | + if span.attributes is not None: |
| 36 | + exec_id = span.attributes.get("execution.id") |
| 37 | + if exec_id is not None and isinstance(exec_id, str): |
| 38 | + self._spans[exec_id].append(span) |
| 39 | + |
| 40 | + return SpanExportResult.SUCCESS |
| 41 | + |
| 42 | + def get_spans(self, execution_id: str) -> List[ReadableSpan]: |
| 43 | + """Retrieve spans for a given execution id.""" |
| 44 | + return self._spans.get(execution_id, []) |
| 45 | + |
| 46 | + def clear(self, execution_id: Optional[str] = None) -> None: |
| 47 | + """Clear stored spans for one or all executions.""" |
| 48 | + if execution_id: |
| 49 | + self._spans.pop(execution_id, None) |
| 50 | + else: |
| 51 | + self._spans.clear() |
| 52 | + |
| 53 | + def shutdown(self) -> None: |
| 54 | + self.clear() |
| 55 | + |
| 56 | + |
| 57 | +class UiPathEvalContext(UiPathRuntimeContext, Generic[C]): |
| 58 | + """Context used for evaluation runs.""" |
| 59 | + |
| 60 | + runtime_context: C |
| 61 | + no_report: bool |
| 62 | + workers: int |
| 63 | + eval_set: Optional[str] = None |
| 64 | + eval_ids: Optional[List[str]] = None |
| 65 | + |
| 66 | + def __init__( |
| 67 | + self, |
| 68 | + runtime_context: C, |
| 69 | + no_report: bool, |
| 70 | + workers: int, |
| 71 | + eval_set: Optional[str] = None, |
| 72 | + eval_ids: Optional[List[str]] = None, |
| 73 | + **kwargs, |
| 74 | + ): |
| 75 | + super().__init__( |
| 76 | + runtime_context=runtime_context, # type: ignore |
| 77 | + no_report=no_report, |
| 78 | + workers=workers, |
| 79 | + eval_set=eval_set, |
| 80 | + eval_ids=eval_ids, |
| 81 | + **kwargs, |
| 82 | + ) |
| 83 | + self._auto_discover() |
| 84 | + |
| 85 | + def _auto_discover(self): |
| 86 | + self.runtime_context.entrypoint = ( |
| 87 | + self.runtime_context.entrypoint or auto_discover_entrypoint() |
| 88 | + ) |
| 89 | + self.eval_set = self.eval_set or EvalHelpers.auto_discover_eval_set() |
| 90 | + |
| 91 | + |
| 92 | +class UiPathEvalRuntime(UiPathBaseRuntime, Generic[T, C]): |
| 93 | + """Specialized runtime for evaluation runs, with access to the factory.""" |
| 94 | + |
| 95 | + def __init__( |
| 96 | + self, context: "UiPathEvalContext[C]", factory: "UiPathRuntimeFactory[T, C]" |
| 97 | + ): |
| 98 | + super().__init__(context) |
| 99 | + self.context: "UiPathEvalContext[C]" = context |
| 100 | + self.factory: UiPathRuntimeFactory[T, C] = factory |
| 101 | + self.span_exporter: ExecutionSpanExporter = ExecutionSpanExporter() |
| 102 | + self.factory.add_span_exporter(self.span_exporter) |
| 103 | + |
| 104 | + @classmethod |
| 105 | + def from__eval_context( |
| 106 | + cls, |
| 107 | + context: "UiPathEvalContext[C]", |
| 108 | + factory: "UiPathRuntimeFactory[T, C]", |
| 109 | + ) -> "UiPathEvalRuntime[T, C]": |
| 110 | + return cls(context, factory) |
| 111 | + |
| 112 | + async def execute(self) -> Optional[UiPathRuntimeResult]: |
| 113 | + """Evaluation logic. Can spawn other runtimes through the factory.""" |
| 114 | + if self.context.eval_set is None: |
| 115 | + raise ValueError("eval_set must be provided for evaluation runs") |
| 116 | + |
| 117 | + evaluation_set = EvalHelpers.load_eval_set( |
| 118 | + self.context.eval_set, self.context.eval_ids |
| 119 | + ) |
| 120 | + execution_output_list: list[UiPathEvalRunExecutionOutput] = [] |
| 121 | + for eval_item in evaluation_set.evaluations: |
| 122 | + execution_output_list.append(await self.execute_agent(eval_item)) |
| 123 | + |
| 124 | + self.context.result = UiPathRuntimeResult( |
| 125 | + output={ |
| 126 | + "results": execution_output_list, |
| 127 | + }, |
| 128 | + status=UiPathRuntimeStatus.SUCCESSFUL, |
| 129 | + resume=None, |
| 130 | + ) |
| 131 | + |
| 132 | + return self.context.runtime_context.result |
| 133 | + |
| 134 | + def _prepare_new_runtime_context(self, eval_item: EvaluationItem) -> C: |
| 135 | + runtime_context = copy.deepcopy(self.context.runtime_context) |
| 136 | + runtime_context.execution_id = eval_item.id |
| 137 | + runtime_context.input_json = eval_item.inputs |
| 138 | + # here we can pass other values from eval_item: expectedAgentBehavior, simulationInstructions etc. |
| 139 | + return runtime_context |
| 140 | + |
| 141 | + # TODO: this would most likely need to be ported to a public AgentEvaluator class |
| 142 | + async def execute_agent( |
| 143 | + self, eval_item: EvaluationItem |
| 144 | + ) -> "UiPathEvalRunExecutionOutput": |
| 145 | + runtime_context = self._prepare_new_runtime_context(eval_item) |
| 146 | + start_time = time() |
| 147 | + result = await self.factory.execute_in_root_span( |
| 148 | + runtime_context, root_span=eval_item.name |
| 149 | + ) |
| 150 | + end_time = time() |
| 151 | + if runtime_context.execution_id is None: |
| 152 | + raise ValueError("execution_id must be set for eval runs") |
| 153 | + |
| 154 | + spans = self.span_exporter.get_spans(runtime_context.execution_id) |
| 155 | + self.span_exporter.clear(runtime_context.execution_id) |
| 156 | + |
| 157 | + if result is None: |
| 158 | + raise ValueError("Execution result cannot be None for eval runs") |
| 159 | + |
| 160 | + return UiPathEvalRunExecutionOutput( |
| 161 | + execution_time=end_time - start_time, |
| 162 | + spans=spans, |
| 163 | + result=result, |
| 164 | + ) |
| 165 | + |
| 166 | + async def cleanup(self) -> None: |
| 167 | + """Cleanup runtime resources.""" |
| 168 | + pass |
| 169 | + |
| 170 | + async def validate(self) -> None: |
| 171 | + """Cleanup runtime resources.""" |
| 172 | + pass |
0 commit comments