Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def __init__(
:param kwargs: Additional configuration parameters.
:type kwargs: Any
"""
self.typing = TypingIndicator()
self._route_list = _RouteList[StateT]()

configuration = kwargs
Expand Down Expand Up @@ -659,9 +658,12 @@ async def on_turn(self, context: TurnContext):
await self._start_long_running_call(context, self._on_turn)

async def _on_turn(self, context: TurnContext):
typing = None
try:
if context.activity.type != ActivityTypes.typing:
await self._start_typing(context)
if self._options.start_typing_timer:
typing = TypingIndicator()
await typing.start(context)

self._remove_mentions(context)

Expand Down Expand Up @@ -709,11 +711,8 @@ async def _on_turn(self, context: TurnContext):
)
await self._on_error(context, err)
finally:
self.typing.stop()

async def _start_typing(self, context: TurnContext):
if self._options.start_typing_timer:
await self.typing.start(context)
if typing:
await typing.stop()

def _remove_mentions(self, context: TurnContext):
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
"""

from __future__ import annotations
import asyncio
import logging

from threading import Timer
from typing import Optional

from microsoft_agents.hosting.core import TurnContext
Expand All @@ -20,36 +20,60 @@ class TypingIndicator:
Encapsulates the logic for sending "typing" activity to the user.
"""

_interval: int
_timer: Optional[Timer] = None

def __init__(self, interval=1000) -> None:
self._interval = interval
def __init__(self, intervalSeconds=1) -> None:
self._intervalSeconds = intervalSeconds
self._task: Optional[asyncio.Task] = None
self._running: bool = False
self._lock = asyncio.Lock()

async def start(self, context: TurnContext) -> None:
if self._timer is not None:
return
async with self._lock:
if self._running:
return

logger.debug(
f"Starting typing indicator with interval: {self._intervalSeconds} seconds"
)
self._running = True
self._task = asyncio.create_task(self._typing_loop(context))

logger.debug(f"Starting typing indicator with interval: {self._interval} ms")
func = self._on_timer(context)
self._timer = Timer(self._interval, func)
self._timer.start()
await func()
async def stop(self) -> None:
async with self._lock:
if not self._running:
return

def stop(self) -> None:
if self._timer:
logger.debug("Stopping typing indicator")
self._timer.cancel()
self._timer = None
self._running = False
task = self._task
self._task = None

def _on_timer(self, context: TurnContext):
async def __call__():
# Cancel outside the lock to avoid blocking
if task and not task.done():
task.cancel()
try:
logger.debug("Sending typing activity")
await context.send_activity(Activity(type=ActivityTypes.typing))
except Exception as e:
# TODO: Improve when adding logging
logger.error(f"Error sending typing activity: {e}")
self.stop()

return __call__
await task
except asyncio.CancelledError:
pass

async def _typing_loop(self, context: TurnContext):
"""Continuously send typing indicators at the specified interval."""
try:
while True:
# Check running status under lock
async with self._lock:
if not self._running:
break

try:
logger.debug("Sending typing activity")
await context.send_activity(Activity(type=ActivityTypes.typing))
except Exception as e:
logger.error(f"Error sending typing activity: {e}")
async with self._lock:
self._running = False
break

await asyncio.sleep(self._intervalSeconds)
except asyncio.CancelledError:
logger.debug("Typing indicator loop cancelled")
raise
76 changes: 76 additions & 0 deletions tests/hosting_core/app/test_typing_indicator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import asyncio

import pytest

from microsoft_agents.activity import Activity, ActivityTypes
from microsoft_agents.hosting.core.app.typing_indicator import TypingIndicator


class StubTurnContext:
"""Test double that tracks sent activities."""

def __init__(self, should_raise: bool = False) -> None:
self.sent_activities = []
self.should_raise = should_raise

async def send_activity(self, activity: Activity):
if self.should_raise:
raise RuntimeError("send_activity failure")
self.sent_activities.append(activity)
return None


@pytest.mark.asyncio
async def test_start_sends_typing_activity():
context = StubTurnContext()
indicator = TypingIndicator(intervalSeconds=0.01)

await indicator.start(context)
await asyncio.sleep(0.03)
await indicator.stop()

assert len(context.sent_activities) >= 1
assert all(activity.type == ActivityTypes.typing for activity in context.sent_activities)


@pytest.mark.asyncio
async def test_start_is_idempotent():
context = StubTurnContext()
indicator = TypingIndicator(intervalSeconds=0.01)

await indicator.start(context)
first_task = indicator._task # noqa: SLF001 - accessing for test verification

await indicator.start(context)
second_task = indicator._task # noqa: SLF001

assert first_task is second_task

await indicator.stop()


@pytest.mark.asyncio
async def test_stop_without_start_is_noop():
indicator = TypingIndicator()

await indicator.stop()

assert indicator._task is None # noqa: SLF001
assert indicator._running is False # noqa: SLF001


@pytest.mark.asyncio
async def test_typing_loop_stops_on_send_error():
context = StubTurnContext(should_raise=True)
indicator = TypingIndicator(intervalSeconds=0.01)

await indicator.start(context)
await asyncio.sleep(0.02)

assert indicator._task is not None # noqa: SLF001
await asyncio.wait_for(indicator._task, timeout=0.1) # Ensure loop exits

assert indicator._running is False # noqa: SLF001
assert indicator._task.done() # noqa: SLF001

await indicator.stop()