Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5eef4df
TypingIndicator simplification
rodrigobr-msft Oct 29, 2025
282d75a
Merge branch 'main' of https://github.com/microsoft/Agents-for-python…
rodrigobr-msft Nov 6, 2025
20a5ad4
Adjusting TypingIndicator unit tests to fit new API
rodrigobr-msft Nov 6, 2025
95218b1
Formatting tests
rodrigobr-msft Nov 6, 2025
7c521c4
Async version of typing indicator
rodrigobr-msft Nov 11, 2025
236c07a
Unit tests
rodrigobr-msft Nov 11, 2025
241b676
Finalizing TypingIndicator tests
rodrigobr-msft Nov 11, 2025
8f99968
Typing indicator construction fix
rodrigobr-msft Nov 11, 2025
feb2c2b
Update libraries/microsoft-agents-hosting-core/microsoft_agents/hosti…
rodrigobr-msft Nov 11, 2025
efe9e24
Another commit
rodrigobr-msft Nov 11, 2025
5fadd08
Merge branch 'users/robrandao/typing-indicator' of https://github.com…
rodrigobr-msft Nov 11, 2025
43f1ccf
Reformatting
rodrigobr-msft Nov 11, 2025
700aa5a
Update tests/hosting_core/app/test_typing_indicator.py
rodrigobr-msft Nov 11, 2025
44bc035
Update libraries/microsoft-agents-hosting-core/microsoft_agents/hosti…
rodrigobr-msft Nov 11, 2025
6856f3d
Addressing PR comments
rodrigobr-msft Nov 11, 2025
eaf5e09
Addressing PR comment
rodrigobr-msft Nov 11, 2025
56dd761
Idempotency of start and stop and logging changes
rodrigobr-msft Nov 12, 2025
ea305dc
Merge branch 'main' into users/robrandao/typing-indicator
rodrigobr-msft Nov 12, 2025
d36c9e2
Quick bug fix in start
rodrigobr-msft Nov 12, 2025
338cd5a
Merge branch 'main' of https://github.com/microsoft/Agents-for-python…
rodrigobr-msft Nov 12, 2025
25065ba
Merge branch 'users/robrandao/typing-indicator' of https://github.com…
rodrigobr-msft Nov 12, 2025
e135ef6
Merge branch 'main' into users/robrandao/typing-indicator
rodrigobr-msft Nov 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,8 @@ async def _on_turn(self, context: TurnContext):
try:
if context.activity.type != ActivityTypes.typing:
if self._options.start_typing_timer:
typing = TypingIndicator()
await typing.start(context)
typing = TypingIndicator(context)
typing.start()

self._remove_mentions(context)

Expand Down Expand Up @@ -733,7 +733,7 @@ async def _on_turn(self, context: TurnContext):
await self._on_error(context, err)
finally:
if typing:
await typing.stop()
typing.stop()

def _remove_mentions(self, context: TurnContext):
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
"""

from __future__ import annotations

import asyncio
import logging

from typing import Optional

from microsoft_agents.hosting.core import TurnContext
Expand All @@ -18,61 +18,64 @@
class TypingIndicator:
"""
Encapsulates the logic for sending "typing" activity to the user.

Scoped to a single turn of conversation with the user.
"""

def __init__(self, intervalSeconds=1) -> None:
self._intervalSeconds = intervalSeconds
self._task: Optional[asyncio.Task] = None
self._running: bool = False
self._lock = asyncio.Lock()
def __init__(self, context: TurnContext, interval_seconds: float = 10.0) -> None:
"""Initializes a new instance of the TypingIndicator class.

async def start(self, context: TurnContext) -> None:
async with self._lock:
if self._running:
return
:param context: The turn context.
:param interval_seconds: The interval in seconds between typing indicators.
"""
if interval_seconds <= 0:
raise ValueError("interval_seconds must be greater than 0")
self._context: TurnContext = context
self._interval: float = interval_seconds
self._task: Optional[asyncio.Task[None]] = None

logger.debug(
f"Starting typing indicator with interval: {self._intervalSeconds} seconds"
)
self._running = True
self._task = asyncio.create_task(self._typing_loop(context))

async def stop(self) -> None:
async with self._lock:
if not self._running:
return

logger.debug("Stopping typing indicator")
self._running = False
task = self._task
self._task = None

# Cancel outside the lock to avoid blocking
if task and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

async def _typing_loop(self, context: TurnContext):
"""Continuously send typing indicators at the specified interval."""
async def _run(self) -> None:
"""Sends typing indicators at regular intervals."""

running_task = self._task
try:
while True:
# Check running status under lock
async with self._lock:
if not self._running:
break

try:
logger.debug("Sending typing activity")
await context.send_activity(Activity(type=ActivityTypes.typing))
except Exception as e:
logger.error(f"Error sending typing activity: {e}")
async with self._lock:
self._running = False
break

await asyncio.sleep(self._intervalSeconds)
while running_task is self._task:
await self._context.send_activity(Activity(type=ActivityTypes.typing))
Copy link

Copilot AI Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing error handling in the _run() method. If send_activity() raises an exception (e.g., network error, bot framework error), the task will crash and stop sending typing indicators without any logging or graceful handling.

The old implementation caught exceptions and logged them. Consider adding try-except handling around the send_activity call:

try:
    await self._context.send_activity(Activity(type=ActivityTypes.typing))
except Exception as e:
    logger.error("Error sending typing activity: %s", e)
    # Optionally: break or continue depending on desired behavior
Suggested change
await self._context.send_activity(Activity(type=ActivityTypes.typing))
try:
await self._context.send_activity(Activity(type=ActivityTypes.typing))
except Exception as e:
logger.error("Error sending typing activity: %s", e)

Copilot uses AI. Check for mistakes.
await asyncio.sleep(self._interval)
Comment on lines +37 to +44
Copy link

Copilot AI Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a race condition in the _run method. If stop() is called immediately after start() but before the task begins executing _run(), then when _run() starts:

  • Line 40: running_task = self._task will be None (since stop() set it to None)
  • Line 42: while None is self._task evaluates to while None is None, which is True
  • The loop will execute and try to send a typing activity even though stop() was already called

A more robust approach would be to initialize running_task to the current task object:

async def _run(self) -> None:
    """Sends typing indicators at regular intervals."""
    running_task = asyncio.current_task()
    try:
        while running_task is self._task:
            await self._context.send_activity(Activity(type=ActivityTypes.typing))
            await asyncio.sleep(self._interval)
    except asyncio.CancelledError:
        pass

This ensures the identity check works correctly even if stop() is called before _run() executes.

Copilot uses AI. Check for mistakes.
except asyncio.CancelledError:
logger.debug("Typing indicator loop cancelled")
# Task was cancelled, exit gracefully
pass

def start(self) -> None:
"""Starts sending typing indicators."""

if self._task is not None:
logger.warning(
"Typing indicator is already running for conversation %s",
self._context.activity.conversation.id,
)
return

logger.debug(
"Starting typing indicator with interval: %s seconds in conversation %s",
self._interval,
self._context.activity.conversation.id,
)
self._task = asyncio.create_task(self._run())

def stop(self) -> None:
"""Stops sending typing indicators."""

if self._task is None:
logger.warning(
"Typing indicator is not running for conversation %s",
self._context.activity.conversation.id,
)
return

logger.debug(
"Stopping typing indicator for conversation %s",
self._context.activity.conversation.id,
)
self._task.cancel()
self._task = None
107 changes: 80 additions & 27 deletions tests/hosting_core/app/test_typing_indicator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License.
"""

import asyncio

import pytest
Expand All @@ -12,6 +17,7 @@ class StubTurnContext:
def __init__(self, should_raise: bool = False) -> None:
self.sent_activities = []
self.should_raise = should_raise
self.activity = Activity(type="text", conversation={"id": "test_convo"})

async def send_activity(self, activity: Activity):
if self.should_raise:
Expand All @@ -22,57 +28,104 @@ async def send_activity(self, activity: Activity):

@pytest.mark.asyncio
async def test_start_sends_typing_activity():
"""Test that start() begins sending typing activities at regular interval_secondss."""
context = StubTurnContext()
indicator = TypingIndicator(intervalSeconds=0.01)
indicator = TypingIndicator(context, interval_seconds=0.01) # 10ms interval_seconds
Copy link

Copilot AI Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inline comment has a typo: "10ms interval_seconds" should be "10ms interval" (the parameter name shouldn't appear in the comment describing the value).

Suggested change
indicator = TypingIndicator(context, interval_seconds=0.01) # 10ms interval_seconds
indicator = TypingIndicator(context, interval_seconds=0.01) # 10ms interval

Copilot uses AI. Check for mistakes.

await indicator.start(context)
await asyncio.sleep(0.03)
await indicator.stop()
indicator.start()
await asyncio.sleep(0.05) # Wait 50ms to allow multiple typing activities
indicator.stop()

assert len(context.sent_activities) >= 1
# Should have sent at least 3 typing activities (50ms / 10ms = 5, but accounting for timing)
assert len(context.sent_activities) >= 3
assert all(
activity.type == ActivityTypes.typing for activity in context.sent_activities
)


@pytest.mark.asyncio
async def test_start_is_idempotent():
async def test_start_creates_task():
"""Test that start() creates an asyncio task."""
context = StubTurnContext()
indicator = TypingIndicator(intervalSeconds=0.01)
indicator = TypingIndicator(context)

indicator.start()

await indicator.start(context)
first_task = indicator._task # noqa: SLF001 - accessing for test verification
assert indicator._task is not None
assert isinstance(indicator._task, asyncio.Task)

await indicator.start(context)
second_task = indicator._task # noqa: SLF001
indicator.stop()

assert first_task is second_task

await indicator.stop()
@pytest.mark.asyncio
async def test_start_if_already_running():
"""Test that start() is idempotent if already running."""
context = StubTurnContext()
indicator = TypingIndicator(context)

indicator.start()
indicator.start()

indicator.stop()


@pytest.mark.asyncio
async def test_stop_without_start_is_noop():
indicator = TypingIndicator()
async def test_stop_if_not_running():
"""Test that stop() is idempotent if not running."""
context = StubTurnContext()
indicator = TypingIndicator(context)
indicator.stop()


await indicator.stop()
@pytest.mark.asyncio
async def test_stop_prevents_further_typing_activities():
"""Test that stop() prevents further typing activities from being sent."""
context = StubTurnContext()
indicator = TypingIndicator(context, interval_seconds=0.01)

assert indicator._task is None # noqa: SLF001
assert indicator._running is False # noqa: SLF001
indicator.start()
await asyncio.sleep(0.025) # Let it run briefly
indicator.stop()

count_before = len(context.sent_activities)
await asyncio.sleep(0.03) # Wait more time
count_after = len(context.sent_activities)

assert count_before == count_after # No new activities after stop


@pytest.mark.asyncio
async def test_typing_loop_stops_on_send_error():
context = StubTurnContext(should_raise=True)
indicator = TypingIndicator(intervalSeconds=0.01)
async def test_multiple_start_stop_cycles():
"""Test that the indicator can be started and stopped multiple times."""
context = StubTurnContext()
indicator = TypingIndicator(context, interval_seconds=0.01)

# First cycle
indicator.start()
await asyncio.sleep(0.02)
indicator.stop()
count_first = len(context.sent_activities)

await indicator.start(context)
# Second cycle
indicator.start()
await asyncio.sleep(0.02)
indicator.stop()
count_second = len(context.sent_activities)

assert indicator._task is not None # noqa: SLF001
await asyncio.wait_for(indicator._task, timeout=0.1) # Ensure loop exits
assert count_second > count_first

assert indicator._running is False # noqa: SLF001
assert indicator._task.done() # noqa: SLF001

await indicator.stop()
@pytest.mark.asyncio
async def test_typing_activity_format():
"""Test that sent activities are properly formatted typing activities."""
context = StubTurnContext()
indicator = TypingIndicator(context, interval_seconds=0.01)

indicator.start()
await asyncio.sleep(0.015) # Wait for at least one activity
indicator.stop()

assert len(context.sent_activities) >= 1
for activity in context.sent_activities:
assert isinstance(activity, Activity)
assert activity.type == ActivityTypes.typing
Loading