11"""Implementation of UiPath resume trigger protocols."""
22
33import json
4+ import os
45import uuid
56from typing import Any
67
2122from uipath .platform .action_center import Task
2223from uipath .platform .action_center .tasks import TaskStatus
2324from uipath .platform .common import (
25+ CreateBatchTransform ,
26+ CreateDeepRag ,
2427 CreateEscalation ,
2528 CreateTask ,
2629 InvokeProcess ,
30+ WaitBatchTransform ,
31+ WaitDeepRag ,
2732 WaitEscalation ,
2833 WaitJob ,
2934 WaitTask ,
3035)
36+ from uipath .platform .context_grounding import DeepRagStatus
37+ from uipath .platform .errors import BatchTransformNotCompleteException
3138from uipath .platform .orchestrator .job import JobState
3239from uipath .platform .resume_triggers ._enums import PropertyName , TriggerMarker
3340
@@ -55,7 +62,8 @@ class UiPathResumeTriggerReader:
5562 Implements UiPathResumeTriggerReaderProtocol.
5663 """
5764
58- def _extract_name_hint (self , field_name : str , payload : Any ) -> str | None :
65+ def _extract_field (self , field_name : str , payload : Any ) -> str | None :
66+ """Extracts a field from the payload and returns it if it exists."""
5967 if not payload :
6068 return payload
6169
@@ -100,7 +108,7 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None:
100108 trigger .item_key ,
101109 app_folder_key = trigger .folder_key ,
102110 app_folder_path = trigger .folder_path ,
103- app_name = self ._extract_name_hint ("app_name" , trigger .payload ),
111+ app_name = self ._extract_field ("app_name" , trigger .payload ),
104112 )
105113 pending_status = TaskStatus .PENDING .value
106114 unassigned_status = TaskStatus .UNASSIGNED .value
@@ -137,7 +145,7 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None:
137145 trigger .item_key ,
138146 folder_key = trigger .folder_key ,
139147 folder_path = trigger .folder_path ,
140- process_name = self ._extract_name_hint ("name" , trigger .payload ),
148+ process_name = self ._extract_field ("name" , trigger .payload ),
141149 )
142150 job_state = (job .state or "" ).lower ()
143151 successful_state = JobState .SUCCESSFUL .value
@@ -177,6 +185,64 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None:
177185 }
178186
179187 return trigger_response
188+ case UiPathResumeTriggerType .DEEP_RAG :
189+ if trigger .item_key :
190+ deep_rag = await uipath .context_grounding .retrieve_deep_rag_async (
191+ trigger .item_key ,
192+ index_name = self ._extract_field ("index_name" , trigger .payload ),
193+ )
194+ deep_rag_status = deep_rag .last_deep_rag_status
195+
196+ if deep_rag_status in (
197+ DeepRagStatus .QUEUED ,
198+ DeepRagStatus .IN_PROGRESS ,
199+ ):
200+ raise UiPathPendingTriggerError (
201+ ErrorCategory .SYSTEM ,
202+ f"DeepRag is not finished yet. Current status: { deep_rag_status } " ,
203+ )
204+
205+ if deep_rag_status != DeepRagStatus .SUCCESSFUL :
206+ raise UiPathFaultedTriggerError (
207+ ErrorCategory .USER ,
208+ f"DeepRag '{ deep_rag .name } ' did not finish successfully." ,
209+ )
210+
211+ trigger_response = deep_rag .content
212+
213+ # if response is an empty dictionary, use Deep Rag state as placeholder value
214+ if not trigger_response :
215+ trigger_response = {
216+ "status" : deep_rag_status ,
217+ PropertyName .INTERNAL .value : TriggerMarker .NO_CONTENT .value ,
218+ }
219+ else :
220+ trigger_response = trigger_response .model_dump ()
221+
222+ return trigger_response
223+
224+ case UiPathResumeTriggerType .BATCH_RAG :
225+ if trigger .item_key :
226+ destination_path = self ._extract_field (
227+ "destination_path" , trigger .payload
228+ )
229+ assert destination_path is not None
230+ try :
231+ await uipath .context_grounding .download_batch_transform_result_async (
232+ trigger .item_key ,
233+ destination_path ,
234+ validate_status = True ,
235+ index_name = self ._extract_field (
236+ "index_name" , trigger .payload
237+ ),
238+ )
239+ except BatchTransformNotCompleteException as e :
240+ raise UiPathPendingTriggerError (
241+ ErrorCategory .SYSTEM ,
242+ f"{ e .message } " ,
243+ ) from e
244+
245+ return f"Batch transform completed. Modified file available at { os .path .abspath (destination_path )} "
180246
181247 case UiPathResumeTriggerType .API :
182248 if trigger .api_resume and trigger .api_resume .inbox_id :
@@ -190,6 +256,7 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None:
190256 f"Failed to get trigger payload"
191257 f"Error fetching API trigger payload for inbox { trigger .api_resume .inbox_id } : { str (e )} " ,
192258 ) from e
259+
193260 case _:
194261 raise UiPathFaultedTriggerError (
195262 ErrorCategory .SYSTEM ,
@@ -256,6 +323,15 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger:
256323 case UiPathResumeTriggerType .API :
257324 self ._handle_api_trigger (suspend_value , resume_trigger )
258325
326+ case UiPathResumeTriggerType .DEEP_RAG :
327+ await self ._handle_deep_rag_job_trigger (
328+ suspend_value , resume_trigger , uipath
329+ )
330+ case UiPathResumeTriggerType .BATCH_RAG :
331+ await self ._handle_batch_rag_job_trigger (
332+ suspend_value , resume_trigger , uipath
333+ )
334+
259335 case _:
260336 raise UiPathFaultedTriggerError (
261337 ErrorCategory .SYSTEM ,
@@ -283,6 +359,10 @@ def _determine_trigger_type(self, value: Any) -> UiPathResumeTriggerType:
283359 return UiPathResumeTriggerType .TASK
284360 if isinstance (value , (InvokeProcess , WaitJob )):
285361 return UiPathResumeTriggerType .JOB
362+ if isinstance (value , (CreateDeepRag , WaitDeepRag )):
363+ return UiPathResumeTriggerType .DEEP_RAG
364+ if isinstance (value , (CreateBatchTransform , WaitBatchTransform )):
365+ return UiPathResumeTriggerType .BATCH_RAG
286366 # default to API trigger
287367 return UiPathResumeTriggerType .API
288368
@@ -301,6 +381,10 @@ def _determine_trigger_name(self, value: Any) -> UiPathResumeTriggerName:
301381 return UiPathResumeTriggerName .TASK
302382 if isinstance (value , (InvokeProcess , WaitJob )):
303383 return UiPathResumeTriggerName .JOB
384+ if isinstance (value , (CreateDeepRag , WaitDeepRag )):
385+ return UiPathResumeTriggerName .DEEP_RAG
386+ if isinstance (value , (CreateBatchTransform , WaitBatchTransform )):
387+ return UiPathResumeTriggerName .BATCH_RAG
304388 # default to API trigger
305389 return UiPathResumeTriggerName .API
306390
@@ -333,6 +417,63 @@ async def _handle_task_trigger(
333417 raise Exception ("Failed to create action" )
334418 resume_trigger .item_key = action .key
335419
420+ async def _handle_deep_rag_job_trigger (
421+ self , value : Any , resume_trigger : UiPathResumeTrigger , uipath : UiPath
422+ ) -> None :
423+ """Handle job-type resume triggers.
424+
425+ Args:
426+ value: The suspend value (InvokeProcess or WaitJob)
427+ resume_trigger: The resume trigger to populate
428+ uipath: The UiPath client instance
429+ """
430+ resume_trigger .folder_path = value .index_folder_path
431+ resume_trigger .folder_key = value .index_folder_key
432+ if isinstance (value , WaitDeepRag ):
433+ resume_trigger .item_key = value .deep_rag .id
434+ elif isinstance (value , CreateDeepRag ):
435+ deep_rag = await uipath .context_grounding .start_deep_rag_async (
436+ name = value .name ,
437+ index_name = value .index_name ,
438+ prompt = value .prompt ,
439+ glob_pattern = value .glob_pattern ,
440+ citation_mode = value .citation_mode ,
441+ folder_path = value .index_folder_path ,
442+ folder_key = value .index_folder_key ,
443+ )
444+ if not deep_rag :
445+ raise Exception ("Failed to start deep rag" )
446+ resume_trigger .item_key = deep_rag .id
447+
448+ async def _handle_batch_rag_job_trigger (
449+ self , value : Any , resume_trigger : UiPathResumeTrigger , uipath : UiPath
450+ ) -> None :
451+ """Handle job-type resume triggers.
452+
453+ Args:
454+ value: The suspend value (InvokeProcess or WaitJob)
455+ resume_trigger: The resume trigger to populate
456+ uipath: The UiPath client instance
457+ """
458+ resume_trigger .folder_path = value .index_folder_path
459+ resume_trigger .folder_key = value .index_folder_key
460+ if isinstance (value , WaitBatchTransform ):
461+ resume_trigger .item_key = value .batch_transform .id
462+ elif isinstance (value , CreateBatchTransform ):
463+ batch_transform = await uipath .context_grounding .start_batch_transform_async (
464+ name = value .name ,
465+ index_name = value .index_name ,
466+ prompt = value .prompt ,
467+ output_columns = value .output_columns ,
468+ storage_bucket_folder_path_prefix = value .storage_bucket_folder_path_prefix ,
469+ enable_web_search_grounding = value .enable_web_search_grounding ,
470+ folder_path = value .index_folder_path ,
471+ folder_key = value .index_folder_key ,
472+ )
473+ if not batch_transform :
474+ raise Exception ("Failed to start batch transform" )
475+ resume_trigger .item_key = batch_transform .id
476+
336477 async def _handle_job_trigger (
337478 self , value : Any , resume_trigger : UiPathResumeTrigger , uipath : UiPath
338479 ) -> None :
0 commit comments