33import asyncio
44import logging
55import os
6+ import uuid
67from typing import Any
78from urllib .parse import urlparse
89
1213 UiPathConversationEvent ,
1314 UiPathConversationExchangeEndEvent ,
1415 UiPathConversationExchangeEvent ,
16+ UiPathConversationInterruptEvent ,
17+ UiPathConversationInterruptStartEvent ,
18+ UiPathConversationMessageEvent ,
1519)
20+ from uipath .runtime import UiPathRuntimeResult
1621from uipath .runtime .chat import UiPathChatProtocol
1722from uipath .runtime .context import UiPathRuntimeContext
1823
@@ -51,6 +56,7 @@ def __init__(
5156 self .headers = headers
5257 self ._client : AsyncClient | None = None
5358 self ._connected_event = asyncio .Event ()
59+ self ._waiting_for_resume = False
5460
5561 async def connect (self , timeout : float = 10.0 ) -> None :
5662 """Establish WebSocket connection to the server.
@@ -127,23 +133,7 @@ async def disconnect(self) -> None:
127133 logger .warning ("WebSocket client not connected" )
128134 return
129135
130- # Send exchange end event using stored IDs
131- if self ._client and self ._connected_event .is_set ():
132- try :
133- end_event = UiPathConversationEvent (
134- conversation_id = self .conversation_id ,
135- exchange = UiPathConversationExchangeEvent (
136- exchange_id = self .exchange_id ,
137- end = UiPathConversationExchangeEndEvent (),
138- ),
139- )
140- event_data = end_event .model_dump (
141- mode = "json" , exclude_none = True , by_alias = True
142- )
143- await self ._client .emit ("ConversationEvent" , event_data )
144- logger .info ("Exchange end event sent" )
145- except Exception as e :
146- logger .warning (f"Error sending exchange end event: { e } " )
136+ await self .emit_exchange_end_event ()
147137
148138 try :
149139 logger .info ("Disconnecting from WebSocket server" )
@@ -154,7 +144,9 @@ async def disconnect(self) -> None:
154144 finally :
155145 await self ._cleanup_client ()
156146
157- async def emit_message_event (self , message_event : Any ) -> None :
147+ async def emit_message_event (
148+ self , message_event : UiPathConversationMessageEvent
149+ ) -> None :
158150 """Wrap and send a message event to the WebSocket server.
159151
160152 Args:
@@ -169,6 +161,9 @@ async def emit_message_event(self, message_event: Any) -> None:
169161 if not self ._connected_event .is_set ():
170162 raise RuntimeError ("WebSocket client not in connected state" )
171163
164+ # Store the current message ID, used for emitting interrupt events.
165+ self ._current_message_id = message_event .message_id
166+
172167 try :
173168 # Wrap message event with conversation/exchange IDs
174169 wrapped_event = UiPathConversationEvent (
@@ -191,6 +186,84 @@ async def emit_message_event(self, message_event: Any) -> None:
191186 logger .error (f"Error sending conversation event to WebSocket: { e } " )
192187 raise RuntimeError (f"Failed to send conversation event: { e } " ) from e
193188
189+ async def emit_exchange_end_event (self ):
190+ # Send exchange end event using stored IDs
191+ if self ._client and self ._connected_event .is_set ():
192+ try :
193+ end_event = UiPathConversationEvent (
194+ conversation_id = self .conversation_id ,
195+ exchange = UiPathConversationExchangeEvent (
196+ exchange_id = self .exchange_id ,
197+ end = UiPathConversationExchangeEndEvent (),
198+ ),
199+ )
200+ event_data = end_event .model_dump (
201+ mode = "json" , exclude_none = True , by_alias = True
202+ )
203+ await self ._client .emit ("ConversationEvent" , event_data )
204+ logger .info ("Exchange end event sent" )
205+ except Exception as e :
206+ logger .warning (f"Error sending exchange end event: { e } " )
207+
208+ async def emit_interrupt_event (self , runtime_result : UiPathRuntimeResult ):
209+ # Send startInterrupt event using stored ID's
210+ if self ._client and self ._connected_event .is_set ():
211+ try :
212+
213+ self ._interrupt_id = str (uuid .uuid4 ())
214+
215+ interrupt_event = UiPathConversationEvent (
216+ conversation_id = self .conversation_id ,
217+ exchange = UiPathConversationExchangeEvent (
218+ exchange_id = self .exchange_id ,
219+ message = UiPathConversationMessageEvent (
220+ message_id = self ._current_message_id ,
221+ interrupt = UiPathConversationInterruptEvent (
222+ interrupt_id = self ._interrupt_id ,
223+ start = UiPathConversationInterruptStartEvent (
224+ type = "coded-agent-test" , value = runtime_result .output
225+ ),
226+ ),
227+ ),
228+ ),
229+ )
230+ event_data = interrupt_event .model_dump (
231+ mode = "json" , exclude_none = True , by_alias = True
232+ )
233+ await self ._client .emit ("ConversationEvent" , event_data )
234+ logger .info ("Interrupt event sent" )
235+ except Exception as e :
236+ logger .warning (f"Error sending interrupt event: { e } " )
237+
238+ async def wait_for_resume (self ) -> dict [str , Any ]:
239+ """Wait for the interrupt_end event to be received.
240+
241+ Returns:
242+ Resume data from the interrupt end event
243+ """
244+ if self ._client is None :
245+ raise RuntimeError ("WebSocket client not connected" )
246+
247+ # Initialize resume event and data
248+ self ._resume_event = asyncio .Event ()
249+ self ._resume_data = None
250+ self ._waiting_for_resume = True
251+
252+ # Register handler for interrupt events
253+ self ._client .on ("ConversationEvent" , self ._handle_conversation_event )
254+
255+ try :
256+ # Wait for the resume event to be signaled
257+ await self ._resume_event .wait ()
258+
259+ # Return the resume data
260+ resume_data = self ._resume_data or {}
261+
262+ return resume_data
263+ finally :
264+ # Clear the waiting flag
265+ self ._waiting_for_resume = False
266+
194267 @property
195268 def is_connected (self ) -> bool :
196269 """Check if the WebSocket is currently connected.
@@ -214,6 +287,38 @@ async def _handle_connect_error(self, data: Any) -> None:
214287 """Handle connection error event."""
215288 logger .error (f"WebSocket connection error: { data } " )
216289
290+ async def _handle_conversation_event (self , data : Any , * args : Any ) -> None :
291+ """Handle incoming conversation event from the server.
292+
293+ Args:
294+ data: The incoming conversation event data (JSON)
295+ *args: Additional arguments from Socket.IO
296+ """
297+ # Only process events when actively waiting for resume
298+ if not self ._waiting_for_resume :
299+ return
300+
301+ try :
302+ # Parse the incoming event as a UiPathConversationEvent
303+ event = UiPathConversationEvent .model_validate (data )
304+
305+ if isinstance (event .exchange , UiPathConversationExchangeEvent ):
306+ message = event .exchange .message
307+ if message and message .message_id == self ._current_message_id :
308+ if message .interrupt :
309+ if (
310+ message .interrupt .interrupt_id
311+ == self ._interrupt_id
312+ ):
313+ if message .interrupt .end :
314+ # Extract resume data from the end event
315+ # end is already a dict (typed as Any), no need to call model_dump
316+ self ._resume_data = message .interrupt .end
317+ self ._resume_event .set ()
318+ logger .info ("Resume event received" )
319+ except Exception as e :
320+ logger .error (f"Error handling conversation event: { e } " )
321+
217322 async def _cleanup_client (self ) -> None :
218323 """Clean up client resources."""
219324 self ._connected_event .clear ()
0 commit comments