33import asyncio
44import logging
55import os
6+ import uuid
67from typing import Any
78from urllib .parse import urlparse
89
1213 UiPathConversationEvent ,
1314 UiPathConversationExchangeEndEvent ,
1415 UiPathConversationExchangeEvent ,
16+ UiPathConversationInterruptEvent ,
17+ UiPathConversationInterruptStartEvent ,
18+ UiPathConversationMessageEvent ,
1519)
20+ from uipath .runtime import UiPathRuntimeResult
1621from uipath .runtime .chat import UiPathChatProtocol
1722from uipath .runtime .context import UiPathRuntimeContext
1823
@@ -127,23 +132,7 @@ async def disconnect(self) -> None:
127132 logger .warning ("WebSocket client not connected" )
128133 return
129134
130- # Send exchange end event using stored IDs
131- if self ._client and self ._connected_event .is_set ():
132- try :
133- end_event = UiPathConversationEvent (
134- conversation_id = self .conversation_id ,
135- exchange = UiPathConversationExchangeEvent (
136- exchange_id = self .exchange_id ,
137- end = UiPathConversationExchangeEndEvent (),
138- ),
139- )
140- event_data = end_event .model_dump (
141- mode = "json" , exclude_none = True , by_alias = True
142- )
143- await self ._client .emit ("ConversationEvent" , event_data )
144- logger .info ("Exchange end event sent" )
145- except Exception as e :
146- logger .warning (f"Error sending exchange end event: { e } " )
135+ await self .emit_exchange_end_event ()
147136
148137 try :
149138 logger .info ("Disconnecting from WebSocket server" )
@@ -154,7 +143,9 @@ async def disconnect(self) -> None:
154143 finally :
155144 await self ._cleanup_client ()
156145
157- async def emit_message_event (self , message_event : Any ) -> None :
146+ async def emit_message_event (
147+ self , message_event : UiPathConversationMessageEvent
148+ ) -> None :
158149 """Wrap and send a message event to the WebSocket server.
159150
160151 Args:
@@ -169,6 +160,9 @@ async def emit_message_event(self, message_event: Any) -> None:
169160 if not self ._connected_event .is_set ():
170161 raise RuntimeError ("WebSocket client not in connected state" )
171162
163+ # Store the current message ID, used for emitting interrupt events.
164+ self ._current_message_id = message_event .message_id
165+
172166 try :
173167 # Wrap message event with conversation/exchange IDs
174168 wrapped_event = UiPathConversationEvent (
@@ -191,6 +185,78 @@ async def emit_message_event(self, message_event: Any) -> None:
191185 logger .error (f"Error sending conversation event to WebSocket: { e } " )
192186 raise RuntimeError (f"Failed to send conversation event: { e } " ) from e
193187
188+ async def emit_exchange_end_event (self ):
189+ # Send exchange end event using stored IDs
190+ if self ._client and self ._connected_event .is_set ():
191+ try :
192+ end_event = UiPathConversationEvent (
193+ conversation_id = self .conversation_id ,
194+ exchange = UiPathConversationExchangeEndEvent (
195+ exchange_id = self .exchange_id
196+ ),
197+ )
198+ event_data = end_event .model_dump (
199+ mode = "json" , exclude_none = True , by_alias = True
200+ )
201+ await self ._client .emit ("ConversationEvent" , event_data )
202+ logger .info ("Exchange end event sent" )
203+ except Exception as e :
204+ logger .warning (f"Error sending exchange end event: { e } " )
205+
206+ async def emit_interrupt_event (self , runtime_result : UiPathRuntimeResult ):
207+ # Send startInterrupt event using stored ID's
208+ if self ._client and self ._connected_event .is_set ():
209+ try :
210+
211+ self ._interrupt_id = str (uuid .uuid4 ())
212+
213+ interrupt_event = UiPathConversationEvent (
214+ conversation_id = self .conversation_id ,
215+ exchange = UiPathConversationExchangeEvent (
216+ exchange_id = self .exchange_id ,
217+ message = UiPathConversationMessageEvent (
218+ message_id = self ._current_message_id ,
219+ interrupt = UiPathConversationInterruptEvent (
220+ interrupt_id = self ._interrupt_id ,
221+ start = UiPathConversationInterruptStartEvent (
222+ type = "coded-agent-test" , value = runtime_result .output
223+ ),
224+ ),
225+ ),
226+ ),
227+ )
228+ event_data = interrupt_event .model_dump (
229+ mode = "json" , exclude_none = True , by_alias = True
230+ )
231+ await self ._client .emit ("ConversationEvent" , event_data )
232+ logger .info ("Interrupt event sent" )
233+ except Exception as e :
234+ logger .warning (f"Error sending interrupt event: { e } " )
235+
236+ async def wait_for_resume (self ) -> dict [str , Any ]:
237+ """Wait for the interrupt_end event to be received.
238+
239+ Returns:
240+ Resume data from the interrupt end event
241+ """
242+ if self ._client is None :
243+ raise RuntimeError ("WebSocket client not connected" )
244+
245+ # Initialize resume event and data
246+ self ._resume_event = asyncio .Event ()
247+ self ._resume_data = None
248+
249+ # Register handler for interrupt events
250+ self ._client .on ("InterruptEvent" , self ._handle_conversation_event )
251+
252+ # Wait for the resume event to be signaled
253+ await self ._resume_event .wait ()
254+
255+ # Return the resume data
256+ resume_data = self ._resume_data or {}
257+
258+ return resume_data
259+
194260 @property
195261 def is_connected (self ) -> bool :
196262 """Check if the WebSocket is currently connected.
@@ -214,6 +280,34 @@ async def _handle_connect_error(self, data: Any) -> None:
214280 """Handle connection error event."""
215281 logger .error (f"WebSocket connection error: { data } " )
216282
283+ async def _handle_conversation_event (self , data : Any , * args : Any ) -> None :
284+ """Handle incoming conversation event from the server.
285+
286+ Args:
287+ data: The incoming conversation event data (JSON)
288+ *args: Additional arguments from Socket.IO
289+ """
290+ try :
291+ # Parse the incoming event as a UiPathConversationEvent
292+ event = UiPathConversationEvent .model_validate (data )
293+
294+ if isinstance (event .exchange , UiPathConversationExchangeEvent ):
295+ message = event .exchange .message
296+ if message and message .message_id == self ._current_message_id :
297+ if message .interrupt :
298+ if (
299+ message .interrupt .interrupt_id
300+ == self ._interrupt_id
301+ ):
302+ if message .interrupt .end :
303+ # Extract resume data from the end event
304+ # end is already a dict (typed as Any), no need to call model_dump
305+ self ._resume_data = message .interrupt .end
306+ self ._resume_event .set ()
307+ logger .info ("Resume event received" )
308+ except Exception as e :
309+ logger .error (f"Error handling conversation event: { e } " )
310+
217311 async def _cleanup_client (self ) -> None :
218312 """Clean up client resources."""
219313 self ._connected_event .clear ()
0 commit comments