diff --git a/src/uipath/platform/_uipath.py b/src/uipath/platform/_uipath.py index 044d6f497..df861627f 100644 --- a/src/uipath/platform/_uipath.py +++ b/src/uipath/platform/_uipath.py @@ -6,6 +6,7 @@ from .._utils._auth import resolve_config_from_env from .action_center import TasksService from .agenthub._agenthub_service import AgentHubService +from .automation_tracker import AutomationTrackerService from .chat import ConversationsService, UiPathLlmChatService, UiPathOpenAIService from .common import ( ApiClient, @@ -167,3 +168,7 @@ def guardrails(self) -> GuardrailsService: @property def agenthub(self) -> AgentHubService: return AgentHubService(self._config, self._execution_context, self.folders) + + @property + def automation_tracker(self) -> AutomationTrackerService: + return AutomationTrackerService(self._config, self._execution_context) diff --git a/src/uipath/platform/automation_tracker/__init__.py b/src/uipath/platform/automation_tracker/__init__.py new file mode 100644 index 000000000..f4d190149 --- /dev/null +++ b/src/uipath/platform/automation_tracker/__init__.py @@ -0,0 +1,23 @@ +"""UiPath Automation Tracker (BTS) Models. + +This module contains models and service for tracking business transactions +and operations via the Business Transaction Service (BTS). +""" + +from ._automation_tracker_service import AutomationTrackerService +from .automation_tracker import ( + BusinessObjectPayload, + OperationPayload, + OperationStatus, + TransactionPayload, + TransactionStatus, +) + +__all__ = [ + "AutomationTrackerService", + "BusinessObjectPayload", + "OperationPayload", + "OperationStatus", + "TransactionPayload", + "TransactionStatus", +] diff --git a/src/uipath/platform/automation_tracker/_automation_tracker_service.py b/src/uipath/platform/automation_tracker/_automation_tracker_service.py new file mode 100644 index 000000000..b9b7f3d82 --- /dev/null +++ b/src/uipath/platform/automation_tracker/_automation_tracker_service.py @@ -0,0 +1,332 @@ +"""Automation Tracker (BTS) service for UiPath Platform. + +Provides HTTP client methods for tracking business transactions +and operations via the Business Transaction Service, used for Process Mining. +All errors are logged but never raised, ensuring BTS failures +cannot break agent execution. +""" + +import os +from datetime import datetime, timezone +from typing import Any, Dict, Optional + +from ..._utils import Endpoint, RequestSpec +from ..._utils.constants import ENV_ORGANIZATION_ID, ENV_TENANT_ID +from ...tracing import traced +from ..common import BaseService, UiPathApiConfig, UiPathExecutionContext +from .automation_tracker import ( + OperationPayload, + OperationStatus, + TransactionPayload, + TransactionStatus, +) + + +class AutomationTrackerService(BaseService): + """Service for tracking business transactions and operations via BTS. + + This service provides methods to start/end transactions and operations + for Process Mining tracking. All errors are logged but never raised, + ensuring BTS failures cannot break agent execution. + """ + + def __init__( + self, + config: UiPathApiConfig, + execution_context: UiPathExecutionContext, + ) -> None: + super().__init__(config=config, execution_context=execution_context) + self._organization_id = os.getenv(ENV_ORGANIZATION_ID, "") + self._tenant_id = os.getenv(ENV_TENANT_ID, "") + + def _send(self, endpoint: str, payload_dict: Dict[str, Any]) -> None: + """Send a POST request to BTS, logging but never raising errors.""" + spec = RequestSpec( + method="POST", + endpoint=Endpoint(f"/automationtracker_/{endpoint}"), + json=payload_dict, + ) + try: + self.request( + spec.method, + url=spec.endpoint, + json=spec.json, + ) + except Exception: + self._logger.error( + "Failed to send request to BTS endpoint %s", + endpoint, + exc_info=True, + ) + + async def _send_async(self, endpoint: str, payload_dict: Dict[str, Any]) -> None: + """Send an async POST request to BTS, logging but never raising errors.""" + spec = RequestSpec( + method="POST", + endpoint=Endpoint(f"/automationtracker_/{endpoint}"), + json=payload_dict, + ) + try: + await self.request_async( + spec.method, + url=spec.endpoint, + json=spec.json, + ) + except Exception: + self._logger.error( + "Failed to send request to BTS endpoint %s", + endpoint, + exc_info=True, + ) + + # ── Transaction methods ────────────────────────────────────────── + + @traced(name="automation_tracker_start_transaction", run_type="uipath") + def start_transaction( + self, + *, + transaction_id: str, + name: str, + reference: str, + fingerprint: str, + status: TransactionStatus = TransactionStatus.UNKNOWN, + result: Optional[str] = None, + attributes: Optional[Dict[str, str]] = None, + timestamp: Optional[datetime] = None, + ) -> None: + """Start tracking a business transaction.""" + payload = TransactionPayload( + organization_id=self._organization_id, + tenant_id=self._tenant_id, + transaction_id=transaction_id, + name=name, + reference=reference, + timestamp=timestamp or datetime.now(timezone.utc), + fingerprint=fingerprint, + result=result, + status=status.value, + attributes=attributes or {}, + ) + self._send( + "track/transaction/start", payload.model_dump(by_alias=True, mode="json") + ) + + @traced(name="automation_tracker_start_transaction", run_type="uipath") + async def start_transaction_async( + self, + *, + transaction_id: str, + name: str, + reference: str, + fingerprint: str, + status: TransactionStatus = TransactionStatus.UNKNOWN, + result: Optional[str] = None, + attributes: Optional[Dict[str, str]] = None, + timestamp: Optional[datetime] = None, + ) -> None: + """Start tracking a business transaction (async).""" + payload = TransactionPayload( + organization_id=self._organization_id, + tenant_id=self._tenant_id, + transaction_id=transaction_id, + name=name, + reference=reference, + timestamp=timestamp or datetime.now(timezone.utc), + fingerprint=fingerprint, + result=result, + status=status.value, + attributes=attributes or {}, + ) + await self._send_async( + "track/transaction/start", payload.model_dump(by_alias=True, mode="json") + ) + + @traced(name="automation_tracker_end_transaction", run_type="uipath") + def end_transaction( + self, + *, + transaction_id: str, + name: str, + reference: str, + fingerprint: str, + status: TransactionStatus = TransactionStatus.UNKNOWN, + result: Optional[str] = None, + attributes: Optional[Dict[str, str]] = None, + timestamp: Optional[datetime] = None, + ) -> None: + """End tracking a business transaction.""" + payload = TransactionPayload( + organization_id=self._organization_id, + tenant_id=self._tenant_id, + transaction_id=transaction_id, + name=name, + reference=reference, + timestamp=timestamp or datetime.now(timezone.utc), + fingerprint=fingerprint, + result=result, + status=status.value, + attributes=attributes or {}, + ) + self._send( + "track/transaction/end", payload.model_dump(by_alias=True, mode="json") + ) + + @traced(name="automation_tracker_end_transaction", run_type="uipath") + async def end_transaction_async( + self, + *, + transaction_id: str, + name: str, + reference: str, + fingerprint: str, + status: TransactionStatus = TransactionStatus.UNKNOWN, + result: Optional[str] = None, + attributes: Optional[Dict[str, str]] = None, + timestamp: Optional[datetime] = None, + ) -> None: + """End tracking a business transaction (async).""" + payload = TransactionPayload( + organization_id=self._organization_id, + tenant_id=self._tenant_id, + transaction_id=transaction_id, + name=name, + reference=reference, + timestamp=timestamp or datetime.now(timezone.utc), + fingerprint=fingerprint, + result=result, + status=status.value, + attributes=attributes or {}, + ) + await self._send_async( + "track/transaction/end", payload.model_dump(by_alias=True, mode="json") + ) + + # ── Operation methods ──────────────────────────────────────────── + + @traced(name="automation_tracker_start_operation", run_type="uipath") + def start_operation( + self, + *, + transaction_id: str, + operation_id: str, + name: str, + fingerprint: str, + parent_operation: Optional[str] = None, + status: OperationStatus = OperationStatus.UNKNOWN, + result: Optional[str] = None, + attributes: Optional[Dict[str, str]] = None, + timestamp: Optional[datetime] = None, + ) -> None: + """Start tracking an operation within a transaction.""" + payload = OperationPayload( + organization_id=self._organization_id, + tenant_id=self._tenant_id, + transaction_id=transaction_id, + operation_id=operation_id, + parent_operation=parent_operation, + name=name, + timestamp=timestamp or datetime.now(timezone.utc), + status=status.value, + attributes=attributes or {}, + result=result, + fingerprint=fingerprint, + ) + self._send( + "track/operation/start", payload.model_dump(by_alias=True, mode="json") + ) + + @traced(name="automation_tracker_start_operation", run_type="uipath") + async def start_operation_async( + self, + *, + transaction_id: str, + operation_id: str, + name: str, + fingerprint: str, + parent_operation: Optional[str] = None, + status: OperationStatus = OperationStatus.UNKNOWN, + result: Optional[str] = None, + attributes: Optional[Dict[str, str]] = None, + timestamp: Optional[datetime] = None, + ) -> None: + """Start tracking an operation within a transaction (async).""" + payload = OperationPayload( + organization_id=self._organization_id, + tenant_id=self._tenant_id, + transaction_id=transaction_id, + operation_id=operation_id, + parent_operation=parent_operation, + name=name, + timestamp=timestamp or datetime.now(timezone.utc), + status=status.value, + attributes=attributes or {}, + result=result, + fingerprint=fingerprint, + ) + await self._send_async( + "track/operation/start", payload.model_dump(by_alias=True, mode="json") + ) + + @traced(name="automation_tracker_end_operation", run_type="uipath") + def end_operation( + self, + *, + transaction_id: str, + operation_id: str, + name: str, + fingerprint: str, + parent_operation: Optional[str] = None, + status: OperationStatus = OperationStatus.UNKNOWN, + result: Optional[str] = None, + attributes: Optional[Dict[str, str]] = None, + timestamp: Optional[datetime] = None, + ) -> None: + """End tracking an operation within a transaction.""" + payload = OperationPayload( + organization_id=self._organization_id, + tenant_id=self._tenant_id, + transaction_id=transaction_id, + operation_id=operation_id, + parent_operation=parent_operation, + name=name, + timestamp=timestamp or datetime.now(timezone.utc), + status=status.value, + attributes=attributes or {}, + result=result, + fingerprint=fingerprint, + ) + self._send( + "track/operation/end", payload.model_dump(by_alias=True, mode="json") + ) + + @traced(name="automation_tracker_end_operation", run_type="uipath") + async def end_operation_async( + self, + *, + transaction_id: str, + operation_id: str, + name: str, + fingerprint: str, + parent_operation: Optional[str] = None, + status: OperationStatus = OperationStatus.UNKNOWN, + result: Optional[str] = None, + attributes: Optional[Dict[str, str]] = None, + timestamp: Optional[datetime] = None, + ) -> None: + """End tracking an operation within a transaction (async).""" + payload = OperationPayload( + organization_id=self._organization_id, + tenant_id=self._tenant_id, + transaction_id=transaction_id, + operation_id=operation_id, + parent_operation=parent_operation, + name=name, + timestamp=timestamp or datetime.now(timezone.utc), + status=status.value, + attributes=attributes or {}, + result=result, + fingerprint=fingerprint, + ) + await self._send_async( + "track/operation/end", payload.model_dump(by_alias=True, mode="json") + ) diff --git a/src/uipath/platform/automation_tracker/automation_tracker.py b/src/uipath/platform/automation_tracker/automation_tracker.py new file mode 100644 index 000000000..7d67efe77 --- /dev/null +++ b/src/uipath/platform/automation_tracker/automation_tracker.py @@ -0,0 +1,79 @@ +"""Automation Tracker (BTS) models for UiPath Platform. + +Models for tracking business transactions and operations +via the Business Transaction Service, used for Process Mining. +""" + +from datetime import datetime +from enum import Enum +from typing import Dict, Optional + +from pydantic import BaseModel, ConfigDict, Field + + +class TransactionStatus(str, Enum): + """Status of a BTS transaction.""" + + UNKNOWN = "Unknown" + SUCCESSFUL = "Successful" + FAILED = "Failed" + + +class OperationStatus(str, Enum): + """Status of a BTS operation.""" + + UNKNOWN = "Unknown" + SUCCESSFUL = "Successful" + FAILED = "Failed" + CANCELLED = "Cancelled" + ABANDONED = "Abandoned" + + +class TransactionPayload(BaseModel): + """Wire-format payload for BTS transaction start/end endpoints.""" + + model_config = ConfigDict(populate_by_name=True) + + organization_id: str = Field(alias="organizationId") + tenant_id: str = Field(alias="tenantId") + transaction_id: str = Field(alias="transactionId") + name: str + reference: str + timestamp: datetime + fingerprint: str + result: Optional[str] = None + status: str + attributes: Dict[str, str] = Field(default_factory=dict) + + +class OperationPayload(BaseModel): + """Wire-format payload for BTS operation start/end endpoints.""" + + model_config = ConfigDict(populate_by_name=True) + + organization_id: str = Field(alias="organizationId") + tenant_id: str = Field(alias="tenantId") + transaction_id: str = Field(alias="transactionId") + operation_id: str = Field(alias="operationId") + parent_operation: Optional[str] = Field(default=None, alias="parentOperation") + name: str + timestamp: datetime + status: str + attributes: Dict[str, str] = Field(default_factory=dict) + result: Optional[str] = None + fingerprint: str + + +class BusinessObjectPayload(BaseModel): + """Wire-format payload for BTS business object tracking.""" + + model_config = ConfigDict(populate_by_name=True) + + organization_id: str = Field(alias="organizationId") + tenant_id: str = Field(alias="tenantId") + operation_id: str = Field(alias="operationId") + timestamp: datetime + fingerprint: str + type: str + key: str + interaction_type: str = Field(alias="interactionType")