diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/agent_application.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/agent_application.py index e0cb6de4..60875a0e 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/agent_application.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/agent_application.py @@ -683,8 +683,8 @@ async def _on_turn(self, context: TurnContext): try: if context.activity.type != ActivityTypes.typing: if self._options.start_typing_timer: - typing = TypingIndicator() - await typing.start(context) + typing = TypingIndicator(context) + typing.start() self._remove_mentions(context) @@ -733,7 +733,7 @@ async def _on_turn(self, context: TurnContext): await self._on_error(context, err) finally: if typing: - await typing.stop() + typing.stop() def _remove_mentions(self, context: TurnContext): if ( diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/typing_indicator.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/typing_indicator.py index f60d8b45..e3841e85 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/typing_indicator.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/typing_indicator.py @@ -4,9 +4,9 @@ """ from __future__ import annotations + import asyncio import logging - from typing import Optional from microsoft_agents.hosting.core import TurnContext @@ -18,61 +18,64 @@ class TypingIndicator: """ Encapsulates the logic for sending "typing" activity to the user. + + Scoped to a single turn of conversation with the user. """ - def __init__(self, intervalSeconds=1) -> None: - self._intervalSeconds = intervalSeconds - self._task: Optional[asyncio.Task] = None - self._running: bool = False - self._lock = asyncio.Lock() + def __init__(self, context: TurnContext, interval_seconds: float = 10.0) -> None: + """Initializes a new instance of the TypingIndicator class. - async def start(self, context: TurnContext) -> None: - async with self._lock: - if self._running: - return + :param context: The turn context. + :param interval_seconds: The interval in seconds between typing indicators. + """ + if interval_seconds <= 0: + raise ValueError("interval_seconds must be greater than 0") + self._context: TurnContext = context + self._interval: float = interval_seconds + self._task: Optional[asyncio.Task[None]] = None - logger.debug( - f"Starting typing indicator with interval: {self._intervalSeconds} seconds" - ) - self._running = True - self._task = asyncio.create_task(self._typing_loop(context)) - - async def stop(self) -> None: - async with self._lock: - if not self._running: - return - - logger.debug("Stopping typing indicator") - self._running = False - task = self._task - self._task = None - - # Cancel outside the lock to avoid blocking - if task and not task.done(): - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - - async def _typing_loop(self, context: TurnContext): - """Continuously send typing indicators at the specified interval.""" + async def _run(self) -> None: + """Sends typing indicators at regular intervals.""" + + running_task = self._task try: - while True: - # Check running status under lock - async with self._lock: - if not self._running: - break - - try: - logger.debug("Sending typing activity") - await context.send_activity(Activity(type=ActivityTypes.typing)) - except Exception as e: - logger.error(f"Error sending typing activity: {e}") - async with self._lock: - self._running = False - break - - await asyncio.sleep(self._intervalSeconds) + while running_task is self._task: + await self._context.send_activity(Activity(type=ActivityTypes.typing)) + await asyncio.sleep(self._interval) except asyncio.CancelledError: - logger.debug("Typing indicator loop cancelled") + # Task was cancelled, exit gracefully + pass + + def start(self) -> None: + """Starts sending typing indicators.""" + + if self._task is not None: + logger.warning( + "Typing indicator is already running for conversation %s", + self._context.activity.conversation.id, + ) + return + + logger.debug( + "Starting typing indicator with interval: %s seconds in conversation %s", + self._interval, + self._context.activity.conversation.id, + ) + self._task = asyncio.create_task(self._run()) + + def stop(self) -> None: + """Stops sending typing indicators.""" + + if self._task is None: + logger.warning( + "Typing indicator is not running for conversation %s", + self._context.activity.conversation.id, + ) + return + + logger.debug( + "Stopping typing indicator for conversation %s", + self._context.activity.conversation.id, + ) + self._task.cancel() + self._task = None diff --git a/tests/hosting_core/app/test_typing_indicator.py b/tests/hosting_core/app/test_typing_indicator.py index c58c4ed4..70b9d75b 100644 --- a/tests/hosting_core/app/test_typing_indicator.py +++ b/tests/hosting_core/app/test_typing_indicator.py @@ -1,3 +1,8 @@ +""" +Copyright (c) Microsoft Corporation. All rights reserved. +Licensed under the MIT License. +""" + import asyncio import pytest @@ -12,6 +17,7 @@ class StubTurnContext: def __init__(self, should_raise: bool = False) -> None: self.sent_activities = [] self.should_raise = should_raise + self.activity = Activity(type="text", conversation={"id": "test_convo"}) async def send_activity(self, activity: Activity): if self.should_raise: @@ -22,57 +28,104 @@ async def send_activity(self, activity: Activity): @pytest.mark.asyncio async def test_start_sends_typing_activity(): + """Test that start() begins sending typing activities at regular interval_secondss.""" context = StubTurnContext() - indicator = TypingIndicator(intervalSeconds=0.01) + indicator = TypingIndicator(context, interval_seconds=0.01) # 10ms interval_seconds - await indicator.start(context) - await asyncio.sleep(0.03) - await indicator.stop() + indicator.start() + await asyncio.sleep(0.05) # Wait 50ms to allow multiple typing activities + indicator.stop() - assert len(context.sent_activities) >= 1 + # Should have sent at least 3 typing activities (50ms / 10ms = 5, but accounting for timing) + assert len(context.sent_activities) >= 3 assert all( activity.type == ActivityTypes.typing for activity in context.sent_activities ) @pytest.mark.asyncio -async def test_start_is_idempotent(): +async def test_start_creates_task(): + """Test that start() creates an asyncio task.""" context = StubTurnContext() - indicator = TypingIndicator(intervalSeconds=0.01) + indicator = TypingIndicator(context) + + indicator.start() - await indicator.start(context) - first_task = indicator._task # noqa: SLF001 - accessing for test verification + assert indicator._task is not None + assert isinstance(indicator._task, asyncio.Task) - await indicator.start(context) - second_task = indicator._task # noqa: SLF001 + indicator.stop() - assert first_task is second_task - await indicator.stop() +@pytest.mark.asyncio +async def test_start_if_already_running(): + """Test that start() is idempotent if already running.""" + context = StubTurnContext() + indicator = TypingIndicator(context) + + indicator.start() + indicator.start() + + indicator.stop() @pytest.mark.asyncio -async def test_stop_without_start_is_noop(): - indicator = TypingIndicator() +async def test_stop_if_not_running(): + """Test that stop() is idempotent if not running.""" + context = StubTurnContext() + indicator = TypingIndicator(context) + indicator.stop() + - await indicator.stop() +@pytest.mark.asyncio +async def test_stop_prevents_further_typing_activities(): + """Test that stop() prevents further typing activities from being sent.""" + context = StubTurnContext() + indicator = TypingIndicator(context, interval_seconds=0.01) - assert indicator._task is None # noqa: SLF001 - assert indicator._running is False # noqa: SLF001 + indicator.start() + await asyncio.sleep(0.025) # Let it run briefly + indicator.stop() + + count_before = len(context.sent_activities) + await asyncio.sleep(0.03) # Wait more time + count_after = len(context.sent_activities) + + assert count_before == count_after # No new activities after stop @pytest.mark.asyncio -async def test_typing_loop_stops_on_send_error(): - context = StubTurnContext(should_raise=True) - indicator = TypingIndicator(intervalSeconds=0.01) +async def test_multiple_start_stop_cycles(): + """Test that the indicator can be started and stopped multiple times.""" + context = StubTurnContext() + indicator = TypingIndicator(context, interval_seconds=0.01) + + # First cycle + indicator.start() + await asyncio.sleep(0.02) + indicator.stop() + count_first = len(context.sent_activities) - await indicator.start(context) + # Second cycle + indicator.start() await asyncio.sleep(0.02) + indicator.stop() + count_second = len(context.sent_activities) - assert indicator._task is not None # noqa: SLF001 - await asyncio.wait_for(indicator._task, timeout=0.1) # Ensure loop exits + assert count_second > count_first - assert indicator._running is False # noqa: SLF001 - assert indicator._task.done() # noqa: SLF001 - await indicator.stop() +@pytest.mark.asyncio +async def test_typing_activity_format(): + """Test that sent activities are properly formatted typing activities.""" + context = StubTurnContext() + indicator = TypingIndicator(context, interval_seconds=0.01) + + indicator.start() + await asyncio.sleep(0.015) # Wait for at least one activity + indicator.stop() + + assert len(context.sent_activities) >= 1 + for activity in context.sent_activities: + assert isinstance(activity, Activity) + assert activity.type == ActivityTypes.typing