diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/agent_application.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/agent_application.py index b09605f1..ac777846 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/agent_application.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/agent_application.py @@ -93,7 +93,6 @@ def __init__( :param kwargs: Additional configuration parameters. :type kwargs: Any """ - self.typing = TypingIndicator() self._route_list = _RouteList[StateT]() configuration = kwargs @@ -659,9 +658,12 @@ async def on_turn(self, context: TurnContext): await self._start_long_running_call(context, self._on_turn) async def _on_turn(self, context: TurnContext): + typing = None try: if context.activity.type != ActivityTypes.typing: - await self._start_typing(context) + if self._options.start_typing_timer: + typing = TypingIndicator() + await typing.start(context) self._remove_mentions(context) @@ -709,11 +711,8 @@ async def _on_turn(self, context: TurnContext): ) await self._on_error(context, err) finally: - self.typing.stop() - - async def _start_typing(self, context: TurnContext): - if self._options.start_typing_timer: - await self.typing.start(context) + if typing: + await typing.stop() def _remove_mentions(self, context: TurnContext): if ( diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/typing_indicator.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/typing_indicator.py index b33c568f..24b3c0a0 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/typing_indicator.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/typing_indicator.py @@ -4,9 +4,9 @@ """ from __future__ import annotations +import asyncio import logging -from threading import Timer from typing import Optional from microsoft_agents.hosting.core import TurnContext @@ -20,36 +20,60 @@ class TypingIndicator: Encapsulates the logic for sending "typing" activity to the user. """ - _interval: int - _timer: Optional[Timer] = None - - def __init__(self, interval=1000) -> None: - self._interval = interval + def __init__(self, intervalSeconds=1) -> None: + self._intervalSeconds = intervalSeconds + self._task: Optional[asyncio.Task] = None + self._running: bool = False + self._lock = asyncio.Lock() async def start(self, context: TurnContext) -> None: - if self._timer is not None: - return + async with self._lock: + if self._running: + return + + logger.debug( + f"Starting typing indicator with interval: {self._intervalSeconds} seconds" + ) + self._running = True + self._task = asyncio.create_task(self._typing_loop(context)) - logger.debug(f"Starting typing indicator with interval: {self._interval} ms") - func = self._on_timer(context) - self._timer = Timer(self._interval, func) - self._timer.start() - await func() + async def stop(self) -> None: + async with self._lock: + if not self._running: + return - def stop(self) -> None: - if self._timer: logger.debug("Stopping typing indicator") - self._timer.cancel() - self._timer = None + self._running = False + task = self._task + self._task = None - def _on_timer(self, context: TurnContext): - async def __call__(): + # Cancel outside the lock to avoid blocking + if task and not task.done(): + task.cancel() try: - logger.debug("Sending typing activity") - await context.send_activity(Activity(type=ActivityTypes.typing)) - except Exception as e: - # TODO: Improve when adding logging - logger.error(f"Error sending typing activity: {e}") - self.stop() - - return __call__ + await task + except asyncio.CancelledError: + pass + + async def _typing_loop(self, context: TurnContext): + """Continuously send typing indicators at the specified interval.""" + try: + while True: + # Check running status under lock + async with self._lock: + if not self._running: + break + + try: + logger.debug("Sending typing activity") + await context.send_activity(Activity(type=ActivityTypes.typing)) + except Exception as e: + logger.error(f"Error sending typing activity: {e}") + async with self._lock: + self._running = False + break + + await asyncio.sleep(self._intervalSeconds) + except asyncio.CancelledError: + logger.debug("Typing indicator loop cancelled") + raise diff --git a/tests/hosting_core/app/test_typing_indicator.py b/tests/hosting_core/app/test_typing_indicator.py new file mode 100644 index 00000000..22a09c8f --- /dev/null +++ b/tests/hosting_core/app/test_typing_indicator.py @@ -0,0 +1,76 @@ +import asyncio + +import pytest + +from microsoft_agents.activity import Activity, ActivityTypes +from microsoft_agents.hosting.core.app.typing_indicator import TypingIndicator + + +class StubTurnContext: + """Test double that tracks sent activities.""" + + def __init__(self, should_raise: bool = False) -> None: + self.sent_activities = [] + self.should_raise = should_raise + + async def send_activity(self, activity: Activity): + if self.should_raise: + raise RuntimeError("send_activity failure") + self.sent_activities.append(activity) + return None + + +@pytest.mark.asyncio +async def test_start_sends_typing_activity(): + context = StubTurnContext() + indicator = TypingIndicator(intervalSeconds=0.01) + + await indicator.start(context) + await asyncio.sleep(0.03) + await indicator.stop() + + assert len(context.sent_activities) >= 1 + assert all(activity.type == ActivityTypes.typing for activity in context.sent_activities) + + +@pytest.mark.asyncio +async def test_start_is_idempotent(): + context = StubTurnContext() + indicator = TypingIndicator(intervalSeconds=0.01) + + await indicator.start(context) + first_task = indicator._task # noqa: SLF001 - accessing for test verification + + await indicator.start(context) + second_task = indicator._task # noqa: SLF001 + + assert first_task is second_task + + await indicator.stop() + + +@pytest.mark.asyncio +async def test_stop_without_start_is_noop(): + indicator = TypingIndicator() + + await indicator.stop() + + assert indicator._task is None # noqa: SLF001 + assert indicator._running is False # noqa: SLF001 + + +@pytest.mark.asyncio +async def test_typing_loop_stops_on_send_error(): + context = StubTurnContext(should_raise=True) + indicator = TypingIndicator(intervalSeconds=0.01) + + await indicator.start(context) + await asyncio.sleep(0.02) + + assert indicator._task is not None # noqa: SLF001 + await asyncio.wait_for(indicator._task, timeout=0.1) # Ensure loop exits + + assert indicator._running is False # noqa: SLF001 + assert indicator._task.done() # noqa: SLF001 + + await indicator.stop()