diff --git a/src/uipath/_cli/_evals/_evaluation_strategy.py b/src/uipath/_cli/_evals/_evaluation_strategy.py new file mode 100644 index 000000000..35822cfbe --- /dev/null +++ b/src/uipath/_cli/_evals/_evaluation_strategy.py @@ -0,0 +1,297 @@ +"""Strategy pattern for evaluation execution. + +Decouples how evaluators are run from the evaluation runtime, allowing +local execution (current behavior) or remote execution via the Agents backend. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any, Protocol + +from pydantic import BaseModel + +from uipath.eval.models.models import AgentExecution, EvalItemResult, EvaluationResult, ScoreType + +if TYPE_CHECKING: + from uipath._cli._evals._models._evaluation_set import EvaluationItem + from uipath._cli._evals._models._output import ( + EvaluationResultDto, + EvaluationRunResultDto, + UiPathEvalRunExecutionOutput, + ) + from uipath._cli._evals._remote_evaluator import ( + RemoteEvaluationClient, + RemoteEvaluationStatusResponse, + ) + from uipath.eval.evaluators.base_evaluator import GenericBaseEvaluator + +logger = logging.getLogger(__name__) + + +class EvaluationStrategy(Protocol): + """Protocol for evaluation execution strategies.""" + + async def evaluate( + self, + eval_item: EvaluationItem, + evaluators: list[GenericBaseEvaluator[Any, Any, Any]], + execution_output: UiPathEvalRunExecutionOutput, + run_evaluator_fn: Any, + ) -> list[EvalItemResult]: + """Run evaluators against an agent execution output. + + Args: + eval_item: The evaluation item being evaluated. + evaluators: List of evaluator instances to run. + execution_output: The agent execution output to evaluate. + run_evaluator_fn: Async callable to run a single evaluator + (signature matches UiPathEvalRuntime.run_evaluator). + + Returns: + List of evaluation item results. + """ + ... + + +class LocalEvaluationStrategy: + """Runs evaluators locally in the CLI process. + + This is the default strategy that preserves the existing behavior + where evaluators execute in the same process as the agent. + """ + + async def evaluate( + self, + eval_item: EvaluationItem, + evaluators: list[GenericBaseEvaluator[Any, Any, Any]], + execution_output: UiPathEvalRunExecutionOutput, + run_evaluator_fn: Any, + ) -> list[EvalItemResult]: + evaluation_item_results: list[EvalItemResult] = [] + + for evaluator in evaluators: + if evaluator.id not in eval_item.evaluation_criterias: + continue + + evaluation_criteria = eval_item.evaluation_criterias[evaluator.id] + + evaluation_result = await run_evaluator_fn( + evaluator=evaluator, + execution_output=execution_output, + eval_item=eval_item, + evaluation_criteria=evaluator.evaluation_criteria_type( + **evaluation_criteria + ) + if evaluation_criteria + else None, + ) + + evaluation_item_results.append( + EvalItemResult( + evaluator_id=evaluator.id, + result=evaluation_result, + ) + ) + + return evaluation_item_results + + +class RemoteEvaluationStrategy: + """Submits evaluations to the remote C# Agents backend. + + The backend runs evaluators via Temporal workflows and reports results + to Studio Web. The CLI only needs to poll for results. + """ + + def __init__( + self, + client: RemoteEvaluationClient, + eval_set_run_id: str, + eval_set_id: str, + project_id: str, + entrypoint: str = "", + is_coded: bool = True, + report_to_studio_web: bool = True, + ): + self._client = client + self._eval_set_run_id = eval_set_run_id + self._eval_set_id = eval_set_id + self._project_id = project_id + self._entrypoint = entrypoint + self._is_coded = is_coded + self._report_to_studio_web = report_to_studio_web + + async def evaluate( + self, + eval_item: EvaluationItem, + evaluators: list[GenericBaseEvaluator[Any, Any, Any]], + execution_output: UiPathEvalRunExecutionOutput, + run_evaluator_fn: Any, + ) -> list[EvalItemResult]: + from uipath._cli._evals._remote_evaluator import ( + EvaluationItemPayload, + EvaluatorConfigPayload, + RemoteEvaluationRequest, + RemoteJobStatus, + ) + from uipath.eval.models.models import ( + BooleanEvaluationResult, + ErrorEvaluationResult, + NumericEvaluationResult, + ) + from uipath.eval.models.serializable_span import SerializableSpan + + # Check for custom file:// evaluators — these can't run remotely + from uipath._utils.constants import CUSTOM_EVALUATOR_PREFIX + + has_custom = False + for evaluator in evaluators: + if evaluator.id not in eval_item.evaluation_criterias: + continue + evaluator_schema = getattr(evaluator, "evaluator_schema", "") or "" + if isinstance(evaluator_schema, str) and evaluator_schema.startswith( + CUSTOM_EVALUATOR_PREFIX + ): + has_custom = True + break + + if has_custom: + logger.warning( + f"Eval item '{eval_item.name}' has custom file:// evaluators. " + "Falling back to local evaluation." + ) + local = LocalEvaluationStrategy() + return await local.evaluate( + eval_item, evaluators, execution_output, run_evaluator_fn + ) + + # Serialize traces + serialized_traces = [] + for span in execution_output.spans: + try: + serialized_traces.append(SerializableSpan.from_readable_span(span)) + except Exception as e: + logger.warning(f"Skipping span serialization error: {e}") + + # Build agent output + agent_output: dict[str, Any] | str = {} + if execution_output.result.output: + if isinstance(execution_output.result.output, BaseModel): + agent_output = execution_output.result.output.model_dump() + else: + agent_output = execution_output.result.output + + # Build evaluator configs + evaluator_configs = [] + for evaluator in evaluators: + if evaluator.id not in eval_item.evaluation_criterias: + continue + evaluator_configs.append( + EvaluatorConfigPayload( + id=evaluator.id, + version=getattr(evaluator, "version", "1.0") or "1.0", + evaluatorTypeId=getattr(evaluator, "evaluator_type_id", evaluator.id), + evaluatorConfig=getattr(evaluator, "evaluator_config", {}) or {}, + evaluatorSchema=getattr(evaluator, "evaluator_schema", "") or "", + ) + ) + + # Build evaluation item payload + agent_error = None + if execution_output.result.error: + agent_error = str(execution_output.result.error.detail) + + evaluation_item_payload = EvaluationItemPayload( + id=eval_item.id, + name=eval_item.name, + inputs=eval_item.inputs, + evaluationCriterias=eval_item.evaluation_criterias, + expectedAgentBehavior=eval_item.expected_agent_behavior, + agentOutput=agent_output, + agentExecutionTime=execution_output.execution_time, + serializedTraces=serialized_traces, + agentError=agent_error, + ) + + # Submit to remote backend + request = RemoteEvaluationRequest( + evalSetRunId=self._eval_set_run_id, + evalSetId=self._eval_set_id, + projectId=self._project_id, + entrypoint=self._entrypoint, + isCoded=self._is_coded, + reportToStudioWeb=self._report_to_studio_web, + evaluatorConfigs=evaluator_configs, + evaluationItems=[evaluation_item_payload], + ) + + try: + submit_response = await self._client.submit_evaluation(request) + status_response = await self._client.poll_status( + submit_response.evaluation_job_id + ) + except Exception as e: + logger.warning( + f"Remote evaluation failed for '{eval_item.name}': {e}. " + "Falling back to local evaluation." + ) + local = LocalEvaluationStrategy() + return await local.evaluate( + eval_item, evaluators, execution_output, run_evaluator_fn + ) + + # Convert remote results to EvalItemResult list + return _convert_remote_results(status_response, eval_item.id) + + +def _convert_remote_results( + status_response: RemoteEvaluationStatusResponse, + eval_item_id: str, +) -> list[EvalItemResult]: + """Convert remote evaluation results to local EvalItemResult format.""" + from uipath._cli._evals._remote_evaluator import RemoteJobStatus + from uipath.eval.models.models import ( + BooleanEvaluationResult, + ErrorEvaluationResult, + NumericEvaluationResult, + ) + + results: list[EvalItemResult] = [] + + # Find results for this eval item + for item_result in status_response.results: + if item_result.evaluation_item_id != eval_item_id: + continue + + for evaluator_result in item_result.evaluator_results: + score_type = ScoreType(evaluator_result.score_type) + + evaluation_result: EvaluationResult + if score_type == ScoreType.BOOLEAN: + evaluation_result = BooleanEvaluationResult( + score=bool(evaluator_result.score), + details=evaluator_result.details, + evaluation_time=evaluator_result.evaluation_time, + ) + elif score_type == ScoreType.ERROR: + evaluation_result = ErrorEvaluationResult( + score=evaluator_result.score, + details=evaluator_result.details, + evaluation_time=evaluator_result.evaluation_time, + ) + else: + evaluation_result = NumericEvaluationResult( + score=evaluator_result.score, + details=evaluator_result.details, + evaluation_time=evaluator_result.evaluation_time, + ) + + results.append( + EvalItemResult( + evaluator_id=evaluator_result.evaluator_id, + result=evaluation_result, + ) + ) + + return results diff --git a/src/uipath/_cli/_evals/_progress_reporter.py b/src/uipath/_cli/_evals/_progress_reporter.py index 25d8c9204..28a4774ae 100644 --- a/src/uipath/_cli/_evals/_progress_reporter.py +++ b/src/uipath/_cli/_evals/_progress_reporter.py @@ -710,6 +710,28 @@ async def handle_create_eval_run(self, payload: EvalRunCreatedEvent) -> None: async def handle_update_eval_run(self, payload: EvalRunUpdatedEvent) -> None: try: + if payload.skip_studio_web_reporting: + logger.debug( + f"Skipping Studio Web reporting for eval run update " + f"(execution_id={payload.execution_id}) — handled by remote backend" + ) + # Still track scores locally for aggregation + for eval_result in payload.eval_results: + evaluator_id = eval_result.evaluator_id + if evaluator_id in self.evaluator_scores: + match eval_result.result.score_type: + case ScoreType.NUMERICAL: + self.evaluator_scores[evaluator_id].append( + eval_result.result.score + ) + case ScoreType.BOOLEAN: + self.evaluator_scores[evaluator_id].append( + 100 if eval_result.result.score else 0 + ) + case ScoreType.ERROR: + self.evaluator_scores[evaluator_id].append(0) + return + logger.info( f"Processing UPDATE_EVAL_RUN event: execution_id={payload.execution_id}, " f"success={payload.success}" @@ -776,6 +798,13 @@ async def handle_update_eval_run(self, payload: EvalRunUpdatedEvent) -> None: async def handle_update_eval_set_run(self, payload: EvalSetRunUpdatedEvent) -> None: try: + if payload.skip_studio_web_reporting: + logger.debug( + f"Skipping Studio Web reporting for eval set run update " + f"(execution_id={payload.execution_id}) — handled by remote backend" + ) + return + if eval_set_run_id := self.eval_set_run_ids.get(payload.execution_id): # Skip update if eval_set_run_id was provided by user if eval_set_run_id in self.user_provided_eval_set_run_ids: diff --git a/src/uipath/_cli/_evals/_remote_evaluator.py b/src/uipath/_cli/_evals/_remote_evaluator.py new file mode 100644 index 000000000..3b09b062b --- /dev/null +++ b/src/uipath/_cli/_evals/_remote_evaluator.py @@ -0,0 +1,314 @@ +"""Remote evaluation client for submitting evaluations to the C# Agents backend.""" + +import asyncio +import logging +import os +from enum import Enum +from typing import Any + +from pydantic import BaseModel, Field + +from uipath._utils import Endpoint, RequestSpec +from uipath._utils.constants import ENV_EVAL_BACKEND_URL, ENV_TENANT_ID, HEADER_INTERNAL_TENANT_ID +from uipath.eval.models.serializable_span import SerializableSpan +from uipath.platform import UiPath + +logger = logging.getLogger(__name__) + + +# --- Request/Response Models --- + + +class EvaluatorConfigPayload(BaseModel): + """Evaluator configuration for the remote evaluation request.""" + + id: str + version: str = "1.0" + evaluator_type_id: str = Field(alias="evaluatorTypeId") + evaluator_config: dict[str, Any] = Field(default_factory=dict, alias="evaluatorConfig") + evaluator_schema: str = Field(default="", alias="evaluatorSchema") + + model_config = {"populate_by_name": True} + + +class EvaluationItemPayload(BaseModel): + """Individual evaluation item in the remote evaluation request.""" + + id: str + name: str + inputs: dict[str, Any] = Field(default_factory=dict) + evaluation_criterias: dict[str, dict[str, Any] | None] = Field( + default_factory=dict, alias="evaluationCriterias" + ) + expected_agent_behavior: str = Field(default="", alias="expectedAgentBehavior") + agent_output: dict[str, Any] | str | None = Field(default=None, alias="agentOutput") + agent_execution_time: float = Field(default=0.0, alias="agentExecutionTime") + serialized_traces: list[SerializableSpan] = Field( + default_factory=list, alias="serializedTraces" + ) + agent_error: str | None = Field(default=None, alias="agentError") + eval_run_id: str | None = Field(default=None, alias="evalRunId") + + model_config = {"populate_by_name": True} + + +class RemoteEvaluationRequest(BaseModel): + """Request payload for POST /api/evaluate.""" + + eval_set_run_id: str = Field(alias="evalSetRunId") + eval_set_id: str = Field(alias="evalSetId") + project_id: str = Field(alias="projectId") + entrypoint: str = "" + is_coded: bool = Field(default=True, alias="isCoded") + report_to_studio_web: bool = Field(default=True, alias="reportToStudioWeb") + evaluator_configs: list[EvaluatorConfigPayload] = Field( + default_factory=list, alias="evaluatorConfigs" + ) + evaluation_items: list[EvaluationItemPayload] = Field( + default_factory=list, alias="evaluationItems" + ) + + model_config = {"populate_by_name": True} + + +class RemoteJobStatus(str, Enum): + """Status values for a remote evaluation job.""" + + PENDING = "PENDING" + IN_PROGRESS = "IN_PROGRESS" + COMPLETED = "COMPLETED" + FAILED = "FAILED" + PARTIALLY_COMPLETED = "PARTIALLY_COMPLETED" + + +class EvaluatorResultPayload(BaseModel): + """Single evaluator result from the remote backend.""" + + evaluator_id: str = Field(alias="evaluatorId") + score: float = 0.0 + score_type: int = Field(default=0, alias="scoreType") + details: str | None = None + evaluation_time: float = Field(default=0.0, alias="evaluationTime") + + model_config = {"populate_by_name": True} + + +class EvaluationItemResultPayload(BaseModel): + """Results for a single evaluation item from the remote backend.""" + + evaluation_item_id: str = Field(alias="evaluationItemId") + evaluator_results: list[EvaluatorResultPayload] = Field( + default_factory=list, alias="evaluatorResults" + ) + success: bool = True + + model_config = {"populate_by_name": True} + + +class RemoteEvaluationSubmitResponse(BaseModel): + """Response from POST /api/evaluate.""" + + evaluation_job_id: str = Field(alias="evaluationJobId") + status: RemoteJobStatus = RemoteJobStatus.PENDING + + model_config = {"populate_by_name": True} + + +class RemoteEvaluationStatusResponse(BaseModel): + """Response from GET /api/evaluate/status/{id}.""" + + evaluation_job_id: str = Field(alias="evaluationJobId") + status: RemoteJobStatus + results: list[EvaluationItemResultPayload] = Field(default_factory=list) + evaluator_averages: dict[str, float] = Field( + default_factory=dict, alias="evaluatorAverages" + ) + error: str | None = None + + model_config = {"populate_by_name": True} + + +# --- Client --- + + +class RemoteEvaluationClient: + """Client for submitting evaluations to the remote C# Agents backend. + + The backend runs evaluators via Temporal workflows and reports results + to Studio Web, decoupling evaluation execution from the CLI process. + """ + + def __init__(self, backend_url: str | None = None): + eval_backend_url = backend_url or os.getenv(ENV_EVAL_BACKEND_URL) + uipath = UiPath(base_url=eval_backend_url) if eval_backend_url else UiPath() + self._client = uipath.api_client + self._backend_url = eval_backend_url + + def _get_endpoint_prefix(self) -> str: + """Determine the endpoint prefix based on environment.""" + if self._backend_url: + from urllib.parse import urlparse + + try: + parsed = urlparse(self._backend_url) + hostname = parsed.hostname or parsed.netloc.split(":")[0] + if hostname.lower() in ("localhost", "127.0.0.1"): + return "api/" + except Exception: + pass + return "agentsruntime_/api/" + + def _tenant_header(self) -> dict[str, str | None]: + tenant_id = os.getenv(ENV_TENANT_ID, None) + return {HEADER_INTERNAL_TENANT_ID: tenant_id} + + def _is_localhost(self) -> bool: + if self._backend_url: + from urllib.parse import urlparse + + try: + parsed = urlparse(self._backend_url) + hostname = parsed.hostname or parsed.netloc.split(":")[0] + return hostname.lower() in ("localhost", "127.0.0.1") + except Exception: + pass + return False + + async def submit_evaluation( + self, request: RemoteEvaluationRequest + ) -> RemoteEvaluationSubmitResponse: + """Submit an evaluation job to the remote backend. + + Args: + request: The evaluation request payload. + + Returns: + Response with the evaluation job ID and initial status. + + Raises: + Exception: If the backend is unreachable or returns an error. + """ + prefix = self._get_endpoint_prefix() + spec = RequestSpec( + method="POST", + endpoint=Endpoint(f"{prefix}evaluate"), + json=request.model_dump(by_alias=True), + headers=self._tenant_header(), + ) + + logger.info( + f"Submitting remote evaluation: eval_set_run_id={request.eval_set_run_id}, " + f"items={len(request.evaluation_items)}, evaluators={len(request.evaluator_configs)}" + ) + + response = await self._client.request_async( + method=spec.method, + url=spec.endpoint, + json=spec.json, + headers=spec.headers, + scoped="org" if self._is_localhost() else "tenant", + ) + + import json + + response_data = json.loads(response.content) + result = RemoteEvaluationSubmitResponse.model_validate(response_data) + + logger.info( + f"Remote evaluation submitted: job_id={result.evaluation_job_id}, " + f"status={result.status}" + ) + return result + + async def poll_status( + self, + job_id: str, + timeout: float = 600.0, + initial_interval: float = 1.0, + max_interval: float = 10.0, + ) -> RemoteEvaluationStatusResponse: + """Poll the remote backend for evaluation job status until completion. + + Uses exponential backoff for polling interval. + + Args: + job_id: The evaluation job ID to poll. + timeout: Maximum time in seconds to wait for completion. + initial_interval: Initial polling interval in seconds. + max_interval: Maximum polling interval in seconds. + + Returns: + The final status response with results. + + Raises: + TimeoutError: If the job does not complete within the timeout. + Exception: If the job fails or consecutive network errors occur. + """ + prefix = self._get_endpoint_prefix() + elapsed = 0.0 + interval = initial_interval + consecutive_errors = 0 + max_consecutive_errors = 5 + + terminal_statuses = { + RemoteJobStatus.COMPLETED, + RemoteJobStatus.FAILED, + RemoteJobStatus.PARTIALLY_COMPLETED, + } + + while elapsed < timeout: + try: + spec = RequestSpec( + method="GET", + endpoint=Endpoint(f"{prefix}evaluate/status/{job_id}"), + headers=self._tenant_header(), + ) + + response = await self._client.request_async( + method=spec.method, + url=spec.endpoint, + headers=spec.headers, + scoped="org" if self._is_localhost() else "tenant", + ) + + import json + + response_data = json.loads(response.content) + status_response = RemoteEvaluationStatusResponse.model_validate( + response_data + ) + + consecutive_errors = 0 # Reset on success + + logger.debug( + f"Poll status for job {job_id}: status={status_response.status}" + ) + + if status_response.status in terminal_statuses: + logger.info( + f"Remote evaluation job {job_id} reached terminal status: " + f"{status_response.status}" + ) + return status_response + + except Exception as e: + consecutive_errors += 1 + logger.warning( + f"Error polling evaluation status (attempt {consecutive_errors}/" + f"{max_consecutive_errors}): {e}" + ) + if consecutive_errors >= max_consecutive_errors: + raise RuntimeError( + f"Failed to poll evaluation status after {max_consecutive_errors} " + f"consecutive errors. Last error: {e}" + ) from e + + await asyncio.sleep(interval) + elapsed += interval + # Exponential backoff with cap + interval = min(interval * 1.5, max_interval) + + raise TimeoutError( + f"Remote evaluation job {job_id} did not complete within {timeout}s. " + f"You can check the status manually using the job ID." + ) diff --git a/src/uipath/_cli/_evals/_runtime.py b/src/uipath/_cli/_evals/_runtime.py index 48c64139b..4c15598fe 100644 --- a/src/uipath/_cli/_evals/_runtime.py +++ b/src/uipath/_cli/_evals/_runtime.py @@ -84,6 +84,7 @@ UiPathEvalRunExecutionOutput, convert_eval_execution_output_to_serializable, ) +from ._evaluation_strategy import EvaluationStrategy, LocalEvaluationStrategy from ._span_collection import ExecutionSpanCollector from .mocks.mocks import ( cache_manager_context, @@ -207,6 +208,10 @@ class UiPathEvalContext: resume: bool = False job_id: str | None = None + # Remote evaluation fields + remote_evaluation: bool = False + evaluation_strategy: EvaluationStrategy | None = None + class UiPathEvalRuntime: """Specialized runtime for evaluation runs, with access to the factory.""" @@ -407,6 +412,7 @@ async def execute(self) -> UiPathRuntimeResult: execution_id=self.execution_id, evaluator_scores=evaluator_averages, success=not any_failed, + skip_studio_web_reporting=self.context.remote_evaluation, ), wait_for_completion=False, ) @@ -652,41 +658,32 @@ async def _execute_eval( agent_execution_output ) ) - evaluation_item_results: list[EvalItemResult] = [] - for evaluator in evaluators: - if evaluator.id not in eval_item.evaluation_criterias: - # Skip! - continue - evaluation_criteria = eval_item.evaluation_criterias[evaluator.id] - - evaluation_result = await self.run_evaluator( - evaluator=evaluator, - execution_output=agent_execution_output, - eval_item=eval_item, - # If evaluation criteria is None, validate_and_evaluate defaults to the default - evaluation_criteria=evaluator.evaluation_criteria_type( - **evaluation_criteria - ) - if evaluation_criteria - else None, - ) + # Run evaluators using the configured strategy + strategy = self.context.evaluation_strategy or LocalEvaluationStrategy() + is_remote = self.context.remote_evaluation + + evaluation_item_results = await strategy.evaluate( + eval_item=eval_item, + evaluators=evaluators, + execution_output=agent_execution_output, + run_evaluator_fn=self.run_evaluator, + ) + # Build DTOs from results + # Build a name lookup from evaluators for DTO construction + evaluator_name_map = {e.id: e.name for e in evaluators} + for item_result in evaluation_item_results: dto_result = EvaluationResultDto.from_evaluation_result( - evaluation_result + item_result.result ) - evaluation_run_results.evaluation_run_results.append( EvaluationRunResultDto( - evaluator_name=evaluator.name, + evaluator_name=evaluator_name_map.get( + item_result.evaluator_id, item_result.evaluator_id + ), result=dto_result, - evaluator_id=evaluator.id, - ) - ) - evaluation_item_results.append( - EvalItemResult( - evaluator_id=evaluator.id, - result=evaluation_result, + evaluator_id=item_result.evaluator_id, ) ) @@ -717,6 +714,7 @@ async def _execute_eval( spans=agent_execution_output.spans, logs=agent_execution_output.logs, exception_details=exception_details, + skip_studio_web_reporting=is_remote, ), wait_for_completion=False, ) diff --git a/src/uipath/_cli/cli_eval.py b/src/uipath/_cli/cli_eval.py index f488e6d2a..f691e7c5c 100644 --- a/src/uipath/_cli/cli_eval.py +++ b/src/uipath/_cli/cli_eval.py @@ -243,6 +243,13 @@ def _resolve_model_settings_override( default=False, help="Resume execution from a previous suspended state", ) +@click.option( + "--remote-eval", + is_flag=True, + default=False, + help="Run evaluators on the remote Agents backend instead of locally. " + "Also enabled by UIPATH_REMOTE_EVAL=true environment variable.", +) def eval( entrypoint: str | None, eval_set: str | None, @@ -258,6 +265,7 @@ def eval( max_llm_concurrency: int, input_overrides: dict[str, Any], resume: bool, + remote_eval: bool, ) -> None: """Run an evaluation set against the agent. @@ -275,6 +283,7 @@ def eval( max_llm_concurrency: Maximum concurrent LLM requests input_overrides: Input field overrides mapping (direct field override with deep merge) resume: Resume execution from a previous suspended state + remote_eval: Run evaluators on the remote Agents backend """ set_llm_concurrency(max_llm_concurrency) @@ -311,6 +320,12 @@ def eval( eval_context.input_overrides = input_overrides eval_context.resume = resume + # Enable remote evaluation via flag or environment variable + from uipath._utils.constants import ENV_REMOTE_EVAL + + use_remote_eval = remote_eval or os.getenv(ENV_REMOTE_EVAL, "").lower() == "true" + eval_context.remote_evaluation = use_remote_eval + try: async def execute_eval(): @@ -399,6 +414,42 @@ async def execute_eval(): # Runtime is not required anymore. await runtime.dispose() + # Configure remote evaluation strategy if enabled + if use_remote_eval: + from uipath._cli._evals._evaluation_strategy import ( + RemoteEvaluationStrategy, + ) + from uipath._cli._evals._remote_evaluator import ( + RemoteEvaluationClient, + ) + from uipath._utils.constants import ENV_EVAL_BACKEND_URL + + eval_backend_url = os.getenv(ENV_EVAL_BACKEND_URL) + remote_client = RemoteEvaluationClient( + backend_url=eval_backend_url + ) + + # Determine if evaluators are coded + from uipath.eval.evaluators import BaseLegacyEvaluator + + is_coded = bool(eval_context.evaluators) and not isinstance( + eval_context.evaluators[0], BaseLegacyEvaluator + ) + + eval_context.evaluation_strategy = RemoteEvaluationStrategy( + client=remote_client, + eval_set_run_id=eval_context.eval_set_run_id + or eval_context.execution_id, + eval_set_id=eval_context.evaluation_set.id, + project_id=project_id or "", + entrypoint=eval_context.entrypoint or "", + is_coded=is_coded, + report_to_studio_web=should_register_progress_reporter, + ) + console.info( + "Remote evaluation enabled — evaluators will run on the Agents backend" + ) + try: if project_id: studio_client = StudioClient(project_id) diff --git a/src/uipath/_events/_events.py b/src/uipath/_events/_events.py index df3a21921..54cece521 100644 --- a/src/uipath/_events/_events.py +++ b/src/uipath/_events/_events.py @@ -51,6 +51,7 @@ class EvalRunUpdatedEvent(BaseModel): spans: list[ReadableSpan] logs: list[logging.LogRecord] exception_details: EvalItemExceptionDetails | None = None + skip_studio_web_reporting: bool = False @model_validator(mode="after") def validate_exception_details(self): @@ -63,6 +64,7 @@ class EvalSetRunUpdatedEvent(BaseModel): execution_id: str evaluator_scores: dict[str, float] success: bool = True + skip_studio_web_reporting: bool = False ProgressEvent = Union[ diff --git a/src/uipath/_utils/constants.py b/src/uipath/_utils/constants.py index 2586db875..4d82dbf28 100644 --- a/src/uipath/_utils/constants.py +++ b/src/uipath/_utils/constants.py @@ -2,6 +2,7 @@ DOTENV_FILE = ".env" ENV_BASE_URL = "UIPATH_URL" ENV_EVAL_BACKEND_URL = "UIPATH_EVAL_BACKEND_URL" +ENV_REMOTE_EVAL = "UIPATH_REMOTE_EVAL" ENV_UNATTENDED_USER_ACCESS_TOKEN = "UNATTENDED_USER_ACCESS_TOKEN" ENV_UIPATH_ACCESS_TOKEN = "UIPATH_ACCESS_TOKEN" ENV_FOLDER_KEY = "UIPATH_FOLDER_KEY" diff --git a/src/uipath/eval/models/__init__.py b/src/uipath/eval/models/__init__.py index 9b0f21572..75ba8d85d 100644 --- a/src/uipath/eval/models/__init__.py +++ b/src/uipath/eval/models/__init__.py @@ -15,6 +15,8 @@ ToolCall, ToolOutput, ) +from uipath.eval.models.reconstructed_span import ReconstructedSpan +from uipath.eval.models.serializable_span import SerializableSpan __all__ = [ "AgentExecution", @@ -31,4 +33,6 @@ "ToolCall", "EvaluatorType", "ToolOutput", + "SerializableSpan", + "ReconstructedSpan", ] diff --git a/src/uipath/eval/models/reconstructed_span.py b/src/uipath/eval/models/reconstructed_span.py new file mode 100644 index 000000000..c7bb590d4 --- /dev/null +++ b/src/uipath/eval/models/reconstructed_span.py @@ -0,0 +1,209 @@ +"""Reconstructed span that duck-types the OpenTelemetry ReadableSpan interface. + +Used on the remote evaluation worker side to rebuild spans from serialized data +so that existing evaluators can consume them without code changes. +""" + +from __future__ import annotations + +from types import MappingProxyType +from typing import Any, Sequence + +from uipath.eval.models.serializable_span import SerializableSpan + + +class _ReconstructedStatus: + """Mimics opentelemetry.trace.Status for reconstructed spans.""" + + def __init__(self, status_code_value: int, description: str | None = None): + self._status_code = _ReconstructedStatusCode(status_code_value) + self._description = description + + @property + def status_code(self) -> "_ReconstructedStatusCode": + return self._status_code + + @property + def description(self) -> str | None: + return self._description + + +class _ReconstructedStatusCode: + """Mimics opentelemetry.trace.StatusCode enum values.""" + + UNSET = 0 + OK = 1 + ERROR = 2 + + def __init__(self, value: int): + self._value = value + + @property + def value(self) -> int: + return self._value + + def __eq__(self, other: object) -> bool: + if isinstance(other, _ReconstructedStatusCode): + return self._value == other._value + if isinstance(other, int): + return self._value == other + return NotImplemented + + def __hash__(self) -> int: + return hash(self._value) + + +class _ReconstructedSpanContext: + """Mimics opentelemetry.trace.SpanContext.""" + + def __init__(self, trace_id: int, span_id: int): + self._trace_id = trace_id + self._span_id = span_id + + @property + def trace_id(self) -> int: + return self._trace_id + + @property + def span_id(self) -> int: + return self._span_id + + @property + def trace_flags(self) -> int: + return 0x01 # Sampled + + @property + def trace_state(self) -> None: + return None + + @property + def is_valid(self) -> bool: + return self._trace_id != 0 and self._span_id != 0 + + +class _ReconstructedEvent: + """Mimics opentelemetry.sdk.trace.Event.""" + + def __init__( + self, + name: str, + attributes: dict[str, Any] | None = None, + timestamp: int | None = None, + ): + self._name = name + self._attributes = MappingProxyType(attributes or {}) + self._timestamp = timestamp + + @property + def name(self) -> str: + return self._name + + @property + def attributes(self) -> MappingProxyType[str, Any]: + return self._attributes + + @property + def timestamp(self) -> int | None: + return self._timestamp + + +class ReconstructedSpan: + """A span reconstructed from SerializableSpan data. + + Implements the same interface as opentelemetry.sdk.trace.ReadableSpan + (duck-typing) so that evaluators that expect ReadableSpan can work + with reconstructed spans without modification. + """ + + def __init__(self, data: SerializableSpan): + self._data = data + + # Parse hex IDs into integers + self._trace_id = int(data.trace_id, 16) if data.trace_id else 0 + self._span_id = int(data.span_id, 16) if data.span_id else 0 + + # Build span context + self._context = _ReconstructedSpanContext(self._trace_id, self._span_id) + + # Build parent context + self._parent: _ReconstructedSpanContext | None = None + if data.parent_span_id: + parent_span_id = int(data.parent_span_id, 16) + self._parent = _ReconstructedSpanContext(self._trace_id, parent_span_id) + + # Build status + status_map = {"unset": 0, "ok": 1, "error": 2} + status_code_value = status_map.get(data.status, 0) + self._status = _ReconstructedStatus(status_code_value, data.status_description) + + # Build attributes as MappingProxy (matching ReadableSpan behavior) + self._attributes: MappingProxyType[str, Any] = MappingProxyType( + data.attributes + ) + + # Build events + self._events: tuple[_ReconstructedEvent, ...] = tuple( + _ReconstructedEvent( + name=e.name, + attributes=e.attributes, + timestamp=e.timestamp, + ) + for e in data.events + ) + + @property + def name(self) -> str: + return self._data.name + + @property + def start_time(self) -> int | None: + return self._data.start_time_unix_nano or None + + @property + def end_time(self) -> int | None: + return self._data.end_time_unix_nano or None + + @property + def attributes(self) -> MappingProxyType[str, Any]: + return self._attributes + + @property + def events(self) -> Sequence[_ReconstructedEvent]: + return self._events + + @property + def status(self) -> _ReconstructedStatus: + return self._status + + @property + def parent(self) -> _ReconstructedSpanContext | None: + return self._parent + + def get_span_context(self) -> _ReconstructedSpanContext: + return self._context + + @property + def resource(self) -> None: + return None + + @property + def instrumentation_info(self) -> None: + return None + + @property + def links(self) -> tuple[()]: + return () + + @classmethod + def from_serializable_spans( + cls, spans: list[SerializableSpan] + ) -> list["ReconstructedSpan"]: + """Convert a list of SerializableSpans to ReconstructedSpans. + + Args: + spans: List of SerializableSpan instances. + + Returns: + List of ReconstructedSpan instances. + """ + return [cls(s) for s in spans] diff --git a/src/uipath/eval/models/serializable_span.py b/src/uipath/eval/models/serializable_span.py new file mode 100644 index 000000000..4324171de --- /dev/null +++ b/src/uipath/eval/models/serializable_span.py @@ -0,0 +1,147 @@ +"""JSON-serializable span model for trace data transport across process boundaries.""" + +import logging +from typing import Any + +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +class SerializableSpanEvent(BaseModel): + """JSON-serializable representation of a span event.""" + + name: str + timestamp: int | None = None + attributes: dict[str, Any] = Field(default_factory=dict) + + +class SerializableSpan(BaseModel): + """JSON-serializable representation of an OpenTelemetry ReadableSpan. + + Used to transport span data across process boundaries (e.g., from CLI to + a remote evaluation backend). Preserves full span data including IDs and + timestamps so that spans can be reconstructed on the receiving end. + """ + + name: str + span_id: str = Field(description="16-char hex OTEL span ID") + trace_id: str = Field(description="32-char hex OTEL trace ID") + parent_span_id: str | None = Field( + default=None, description="16-char hex parent span ID" + ) + status: str = Field( + default="unset", description="Span status: 'unset', 'ok', or 'error'" + ) + status_description: str | None = Field( + default=None, description="Status description (typically set for errors)" + ) + start_time_unix_nano: int = Field( + default=0, description="Start time in nanoseconds since epoch" + ) + end_time_unix_nano: int = Field( + default=0, description="End time in nanoseconds since epoch" + ) + attributes: dict[str, Any] = Field(default_factory=dict) + events: list[SerializableSpanEvent] = Field(default_factory=list) + + @classmethod + def from_readable_span(cls, span: Any) -> "SerializableSpan": + """Convert an OpenTelemetry ReadableSpan to a SerializableSpan. + + Args: + span: An OpenTelemetry ReadableSpan instance. + + Returns: + SerializableSpan with all relevant data extracted. + """ + try: + span_context = span.get_span_context() + span_id = format(span_context.span_id, "016x") + trace_id = format(span_context.trace_id, "032x") + except Exception: + logger.warning( + f"Failed to extract span context from span '{getattr(span, 'name', '?')}', " + "using zero IDs" + ) + span_id = "0" * 16 + trace_id = "0" * 32 + + # Extract parent span ID + parent_span_id = None + if span.parent is not None: + try: + parent_span_id = format(span.parent.span_id, "016x") + except Exception: + pass + + # Extract status + status_map = {0: "unset", 1: "ok", 2: "error"} + try: + status = status_map.get(span.status.status_code.value, "unknown") + status_description = span.status.description + except Exception: + status = "unset" + status_description = None + + # Extract timestamps + start_time_unix_nano = span.start_time or 0 + end_time_unix_nano = span.end_time or 0 + + # Extract attributes + attributes: dict[str, Any] = {} + if span.attributes: + for key, value in span.attributes.items(): + try: + attributes[key] = _make_json_safe(value) + except Exception: + logger.debug(f"Skipping non-serializable attribute '{key}'") + + # Extract events + events: list[SerializableSpanEvent] = [] + if hasattr(span, "events") and span.events: + for event in span.events: + try: + event_attrs = {} + if event.attributes: + for k, v in event.attributes.items(): + try: + event_attrs[k] = _make_json_safe(v) + except Exception: + pass + events.append( + SerializableSpanEvent( + name=event.name, + timestamp=event.timestamp, + attributes=event_attrs, + ) + ) + except Exception: + logger.debug(f"Skipping non-serializable event") + + return cls( + name=span.name, + span_id=span_id, + trace_id=trace_id, + parent_span_id=parent_span_id, + status=status, + status_description=status_description, + start_time_unix_nano=start_time_unix_nano, + end_time_unix_nano=end_time_unix_nano, + attributes=attributes, + events=events, + ) + + +def _make_json_safe(value: Any) -> Any: + """Convert a value to a JSON-safe representation. + + OpenTelemetry attribute values are constrained to primitives and + sequences of primitives, so this is straightforward. + """ + if isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, (list, tuple)): + return [_make_json_safe(v) for v in value] + # Fall back to string representation for unknown types + return str(value) diff --git a/src/uipath/tracing/_otel_exporters.py b/src/uipath/tracing/_otel_exporters.py index a47c3acd1..c21d21e6c 100644 --- a/src/uipath/tracing/_otel_exporters.py +++ b/src/uipath/tracing/_otel_exporters.py @@ -134,7 +134,7 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: return SpanExportResult.SUCCESS logger.debug( - f"Exporting {len(spans)} spans to {self.base_url}/llmopstenant_/api/Traces/spans" + f"Exporting {len(spans)} spans to {self.base_url}/api/Traces/spans" ) # Use optimized path: keep attributes as dict for processing @@ -369,7 +369,7 @@ def _process_span_attributes(self, span_data: Dict[str, Any]) -> None: def _build_url(self, span_list: list[Dict[str, Any]]) -> str: """Construct the URL for the API request.""" trace_id = str(span_list[0]["TraceId"]) - return f"{self.base_url}/llmopstenant_/api/Traces/spans?traceId={trace_id}&source=Robots" + return f"{self.base_url}/api/Traces/spans?traceId={trace_id}&source=Robots" def _send_with_retries( self, url: str, payload: list[Dict[str, Any]], max_retries: int = 4 @@ -393,6 +393,10 @@ def _send_with_retries( return SpanExportResult.FAILURE def _get_base_url(self) -> str: + trace_base_url = os.environ.get("UIPATH_TRACE_BASE_URL") + if trace_base_url: + return trace_base_url.rstrip("/") + uipath_url = ( os.environ.get("UIPATH_URL") or "https://cloud.uipath.com/dummyOrg/dummyTennant/" @@ -400,7 +404,7 @@ def _get_base_url(self) -> str: uipath_url = uipath_url.rstrip("/") - return uipath_url + return f"{uipath_url}/llmopstenant_" class JsonLinesFileExporter(SpanExporter): diff --git a/tests/tracing/test_otel_exporters.py b/tests/tracing/test_otel_exporters.py index 40aaa3938..9d8b4bd22 100644 --- a/tests/tracing/test_otel_exporters.py +++ b/tests/tracing/test_otel_exporters.py @@ -64,7 +64,7 @@ def test_init_with_env_vars(mock_env_vars): with patch("uipath.tracing._otel_exporters.httpx.Client"): exporter = LlmOpsHttpExporter() - assert exporter.base_url == "https://test.uipath.com/org/tenant" + assert exporter.base_url == "https://test.uipath.com/org/tenant/llmopstenant_" assert exporter.auth_token == "test-token" assert exporter.headers == { "Content-Type": "application/json", @@ -80,7 +80,10 @@ def test_init_with_default_url(): ): exporter = LlmOpsHttpExporter() - assert exporter.base_url == "https://cloud.uipath.com/dummyOrg/dummyTennant" + assert ( + exporter.base_url + == "https://cloud.uipath.com/dummyOrg/dummyTennant/llmopstenant_" + ) assert exporter.auth_token == "test-token" @@ -164,7 +167,10 @@ def test_get_base_url(): ): with patch("uipath.tracing._otel_exporters.httpx.Client"): exporter = LlmOpsHttpExporter() - assert exporter.base_url == "https://custom.uipath.com/org/tenant" + assert ( + exporter.base_url + == "https://custom.uipath.com/org/tenant/llmopstenant_" + ) # Test with environment variable set but with no trailing slash with patch.dict( @@ -172,13 +178,42 @@ def test_get_base_url(): ): with patch("uipath.tracing._otel_exporters.httpx.Client"): exporter = LlmOpsHttpExporter() - assert exporter.base_url == "https://custom.uipath.com/org/tenant" + assert ( + exporter.base_url + == "https://custom.uipath.com/org/tenant/llmopstenant_" + ) # Test with no environment variable with patch.dict(os.environ, {}, clear=True): with patch("uipath.tracing._otel_exporters.httpx.Client"): exporter = LlmOpsHttpExporter() - assert exporter.base_url == "https://cloud.uipath.com/dummyOrg/dummyTennant" + assert ( + exporter.base_url + == "https://cloud.uipath.com/dummyOrg/dummyTennant/llmopstenant_" + ) + + # Test UIPATH_TRACE_BASE_URL takes precedence over UIPATH_URL + with patch.dict( + os.environ, + { + "UIPATH_TRACE_BASE_URL": "https://custom-trace.example.com/my_prefix", + "UIPATH_URL": "https://custom.uipath.com/org/tenant", + }, + clear=True, + ): + with patch("uipath.tracing._otel_exporters.httpx.Client"): + exporter = LlmOpsHttpExporter() + assert exporter.base_url == "https://custom-trace.example.com/my_prefix" + + # Test UIPATH_TRACE_BASE_URL strips trailing slash + with patch.dict( + os.environ, + {"UIPATH_TRACE_BASE_URL": "https://custom-trace.example.com/prefix/"}, + clear=True, + ): + with patch("uipath.tracing._otel_exporters.httpx.Client"): + exporter = LlmOpsHttpExporter() + assert exporter.base_url == "https://custom-trace.example.com/prefix" def test_send_with_retries_success():